Having kernel threading now working, I set about researching the BSD Socket library and working out how best to fit an implementation into the kernel, particularly with a loopback interface so that I could create a server thread and a client thread and let them talk.
Unfortunately, I didn't get far down this road before I really needed some good old fashioned thread synchronisation.
The Basics
Interrupts can happen at any time, they don't care what you're doing, they will interrupt the currently running code and let some other code run for a while. That's all well and good until you're half-way through updating some structure, you get interrupted, then the next task tries to read that data and finds it broken. Bad things ensue. In SQL Server, there are lot of fanciful things such as transaction isolation levels and snapshots and whatnot to assist you in not reading half-written data unless you really want to.
The Tiniest Atom
Right at the bottom of it all, the smallest building block that we can use, is the humble exchange operator (XCHG). Often overlooked, the XCHG operator gives us the starting point to build everything else because we can use it to construct a "mutex" or mutual exclusive lock (in fact, it is one of the only instructions that can be used to build thread sync functions.)
We define a variable in memory and give it the value 0. When this variable has the value 0, the lock is open and any thread can claim it by setting it to 1. Once it is 1, no other threads can claim it until the owning thread releases it back to 0. But how can we do this knowing that the mischievous interrupts are waiting just around the corner. With XCHG of course.
When a thread wants to attempt to claim the semaphore, it puts the value 1 into a processor register, then calls XCHG to exchange the value in the register with the value of the variable in memory. The processor locks the memory bus to do this (so no other processors or cores can mess it up), and as it's a single assembly instruction, the processor cannot be interrupted in the middle. Once the thread has complete this action, the value in the register is one of two possible values:
- If the register is still 1, this means that another thread already had a lock on the mutex. The variable was 1 before we did our exchange, and our exchange didn't break anything because it swapped a 1 for a 1.
- If the register is now 0, the thread successfully claimed the mutex. The variable was 0 just now, and now it is 1. Any other thread trying to exchange after us will get a 1 and be unsuccessful.
With this coded as a function, it's quite easy to create critical regions of code which are protected by a mutex to ensure that only one thread can enter at a time. A thread performs the exchange, and if it does not obtain the mutex, it can yield() the rest of its time slice and try again the next time it is awoken. Once a thread is in this region it can still be interrupted, but no other threads can enter until it's finished.
This idea of yield()ing if a thread cannot obtain the mutex might seem clunky and inefficient, but this mutex atom is only used by the kernel to protect very small sections of code typically only a few instruction, such as adding an element to a doubly linked list, or reading and then advancing an array pointer. In order to build on this to create more useful thread synchronisation, we need a semaphore.
Semaphores
A semaphore is slightly more complicated, but still basic construct. It is a counter with two functions, "up" to increment the value of the counter by 1, and "down" to decrement the counter by 1. The interesting part about the semaphore is that no thread can reduce the value of the counter below zero. If a thread tries to do so, it is suspended until some other thread performs an "up" so that the "down" can complete.
The semaphore therefore needs a list of waiting threads, tasks which have tried to call "down" and been suspended so that when an "up" is called, a waiting thread can be released. In order to ensure that this queue works as it is supposed to, operations on the queue have to be protected by a mutex.
For completeness, a semaphore can be in one of three states:
- The counter is 0, and the waiting thread list is empty. This is the normal starting state.
- The counter is positive, and the waiting thread list is empty. The next call to "down" will decrement the counter, but will not block.
- The counter is zero, and the waiting thread list contains threads. The next call to "down" will block. The next call to "up" will release one thread from the front of the queue.
Note that the semaphore cannot have a positive counter and waiting threads, if so, something has gone very wrong.
For readability of code, I have gone with the nomenclature of using "wait()" as the function name for "down", and "signal()" as the function name for "up". This makes logical sense as threads trying to reduce the counter may have to wait(), and threads that are incrementing the counter are signal()ling a waiting thread to proceed.
Implementation
It took me a while to work out all the nuances to ensure that the code was robust and there were no holes where a mis-timed interrupt could break the state of the structure or the rules of threads waiting, but I think I got there.
A mutex protects the internal state of the semaphore, so any time signal() or wait() is called, a mutex has to be obtained before the code can proceed. The theory is that it is unlikely that a thread will be interrupted while it is in one of these methods (as they are quite short), and even more unlikely that another thread will attempt to call one of the methods on the same semaphore while a thread is already there, but if both of these do ever happen, the second thread will just yield() for one time slice then try again.
The signal() method is easy, test for which of the three states the list is in, and either dequeue a pid and set it to Runnable, or increment the counter (then release the mutex of course).
The wait() function is a little more complicated.
If the counter is positive, decrement it, release the mutex and return.
If the counter is zero, add the current pid to the list, release the mutex and sleep. The key point here is that the thread sets its state to "sleeping", then it releases the mutex, then it yield()s to start sleeping. If the thread were to release the mutex then set itself to "sleeping", another thread could come in between the two and set the thread state to "runnable" before the original thread had set it to sleep. The thread would still have been dequeued from the waiting list, so it would have slept forever.
If the counter is zero, add the current pid to the list, release the mutex and sleep. The key point here is that the thread sets its state to "sleeping", then it releases the mutex, then it yield()s to start sleeping. If the thread were to release the mutex then set itself to "sleeping", another thread could come in between the two and set the thread state to "runnable" before the original thread had set it to sleep. The thread would still have been dequeued from the waiting list, so it would have slept forever.
The last pitfall is the issue of the capacity of the queue of waiting threads. What happens if a thread calls wait(), but there isn't enough space in the list of waiting pids for it to be added. We can't return, we could sleep that thread forever, we shouldn't really kill it. My solution to this was to make the thread sleep for one second, then try enqueueing itself again. In effect, this creates a finite number of neatly queued threads in the semaphore, and a potentially infinite(!) number of threads in a "swarm", each waiting for a space in the queue. This means that no thread ever gets lost, but it could mean that the waiting threads get awoken in the wrong order. Not a big deal, just make sure the queue has enough capacity.
I changed my main() (which I use as a test-bed for all these tools I'm creating) so that the C and D threads wait() on a semaphore before writing their characters, and the B thread signals five times every 4 B's. The semaphore starts with a value of 3. The result is exactly as expected:
The B thread manages to get in a single B before the C thread gets a turn, at which point it loops through, decrementing the semaphore from its initial 3 to 0, writing out 3 C's. C's fourth call to wait() blocks. D immediately blocks on a wait() because C used up all the semaphore counts.
Four B's later, the B thread signal()s five times. The first signal sets C to be runnable, the second signal sets D to be runnable, the other three calls increment the semaphore counter to 3. When B gets pre-empted, C outputs the C for which it was awoken, then loops around and consumes the remaining 3 counts from the semaphore, outputting four C's in total before wait()ing again. Once C blocks, D outputs the one D for which it was awoken, then wait()s again.
It's hasn't been stress tested yet, but the semaphore code is complete and (I believe) interrupt-proof.
\ o /
No comments:
Post a Comment