Lock-Free Queues

来源:百度文库 编辑:神马文学网 时间:2024/04/28 05:08:28
One thread can write and another read—at the same time!
ByPetru Marginean
July 01, 2008
URL:http://www.ddj.com/high-performance-computing/208801974
Petru is a Vice President for Morgan Stanley, where he works as a C++senior programmer in investment banking. He can be contacted atpetru.marginean@gmail.com.
This article as written assumes a sequentially consistent model. Inparticular, the code relies on specific order of instructions in bothConsumer and Producer methods. However, without inserting proper memorybarrier instructions, these instructions can be reordered withunpredictable results (see, for example, the classicDouble-Checked Locking problem).
Another issue is using the standard std::list. While the article mentions that it is the developer responsibility to check that the reading/writing std::list::iteratoris atomic, this turns out to be too restrictive. While gcc/MSVC++2003has 4-byte iterators, the MSVC++2005 has 8-byte iterators in ReleaseMode and 12-byte iterators in the Debug Mode.
The solution to prevent this is to use memory barriers/volatile variables. Thedownloadable code for the article has fixed that issue.
Many thanks to Herb Sutter who signaled the issue and helped me fix the code.--P.M.
Queues can be useful in a variety of systems involving data-streamprocessing. Typically, you have a data source producing data—requestscoming to a web server, market feeds, or digital telephony packets—at avariable pace, and you need to process the data as fast as possible sothere are no losses. To do this, you can push data into a queue usingone thread and process it using a different thread—a good utilization ofresources on multicore processors. One thread inserts data into thequeue, and the other reads/deletes elements from the queue. Your mainrequirement is that a high-rate data burst does not last longer than thesystem's ability to accumulate data while the consumer thread handlesit. The queue you use has to be threadsafe to prevent race conditionswhen inserting/removing data from multiple threads. For obvious reasons,it is necessary that the queue mutual exclusion mechanism add as littleoverhead as possible.
In this article, I present a lock-free queue (the source code for the lockfreequeue class is available online; seewww.ddj.com/code/) in which one thread can write to the queue and another read from it—at the same time without any locking.
To do this, the code implements these requirements:
There is a single writer (Producer) and single reader (Consumer). When you have multiple producers and consumers, you can still use this queue with some external locking. You cannot have multiple producers writing at the same time (or multiple consumers consuming the data simultaneously), but you can have one producer and one consumer (2x threads) accessing the queue at the same time (Responsibility: developer).
When inserting/erasing to/from an std::list, the iterators for the existing elements must remain valid (Responsibility: library implementor).
Only one thread modifies the queue; the producer thread both adds/erases elements in the queue (Responsibility: library implementor).
Beside the underlying std::list used as the container, the lock-free queue class also holds two iterators pointing to the not-yet-consumed range of elements; each is modified by one thread and read by the other (Responsibility: library implementor).
Reading/writing list::iterator is atomic on the machine upon which you run the application. If they are not on your implementation of STL, you should check whether the raw pointer's operations are atomic. You could easily replace the iterators to be mentioned shortly with raw pointers in the code (Responsibility: machine).
Because I use Standard C++, the code is portable under the aforementioned "machine" assumption:
template struct LockFreeQueue{LockFreeQueue();void Produce(const T& t);bool Consume(T& t);private:typedef std::list TList;TList list;typename TList::iterator iHead, iTail;};
Considering how simple this code is, you might wonder how can it bethreadsafe. The magic is due to design, not implementation. Take a lookat the implementation of the Produce() and Consume() methods. The Produce() method looks like this:
void Produce(const T& t){list.push_back(t);iTail = list.end();list.erase(list.begin(), iHead);}
To understand how this works, mentally separate the data from LockFreeQueue into two groups:
The list and the iTail iterator, modified by the Produce() method (Producer thread).
The iHead iterator, modified by the Consume() method (Consumer thread).
Produce() is the only method that changes the list (adding newelements and erasing the consumed elements), and it is essential thatonly one thread ever calls Produce()—it's the Producer thread! The iterator (iTail) (only manipulated by the Producer thread) changes it only after a new element is added to the list. This way, when the Consumer thread is reading the iTail element, the new added element is ready to be used. The Consume() method tries to read all the elements between iHead and iTail (excluding both ends).
bool Consume(T& t){typename TList::iterator iNext = iHead;++iNext;if (iNext != iTail){iHead = iNext;t = *iHead;return true;}return false;}
This method reads the elements, but doesn't remove them from the list.Nor does it access the list directly, but through the iterators. Theyare guaranteed to be valid after std::list is modified, so no matter what the Producer thread does to the list, you are safe to use them.
The std::list maintains an element (pointed to by iHead) that is considered already read. For this algorithm to work even when the queue was just created, I add an empty T() element in the constructor of the LockFreeQueue (see Figure 1):
LockFreeQueue(){list.push_back(T());iHead = list.begin();iTail = list.end();}
[Click image to view at full size]

Figure 1: Adding an empty T() element in the constructor of the LockFreeQueue.
Consume() may fail to read an element (and return false). Unliketraditional lock-based queues, this queue works fast when the queue isnot empty, but needs an external locking or polling method to wait fordata. Sometimes you want to wait if there is no element available in thequeue, and avoid returning false. A naive approach to waiting is:
T Consume(){T tmp;while (!Consume(tmp));return tmp;}
This Consume() method will likely heat up one of your CPUsred-hot to 100-percent use if there are no elements in the queue.Nevertheless, this should have good performance when the queue is notempty. However, if you think of it, a queue that's almost never empty isa sign of systemic trouble: It means the consumer is unable to keeppace with the producer, and sooner or later, the system is doomed to dieof memory exhaustion. Call this approach NAIVE_POLLING.
A friendlier Consume() function does some pooling and calls some sort of sleep() or yield() function available on your system:
T Consume(int wait_time = 1/*milliseconds*/){T tmp;while (!Consume(tmp)){Sleep(wait_time/*milliseconds*/);}return tmp;}
The DoSleep() can be implemented using nanosleep() (POSIX) or Sleep() (Windows), or even better, using boost::thread::sleep(), which abstracts away system-dependent nomenclature. Call this approach SLEEP. Instead of simple polling, you can use more advanced techniques to signal the Consumer thread that a new element is available. I illustrate this in Listing One using a boost::condition variable.
#include #include #include template struct WaitFreeQueue{void Produce(const T& t){queue.Produce(t);cond.notify_one();}bool Consume(T& t){return queue.Consume(t);}T Consume(int wait_time = 1/*milliseconds*/){T tmp;if (Consume(tmp))return tmp;// the queue is empty, try again (possible waiting...)boost::mutex::scoped_lock lock(mtx);while (!Consume(tmp)) // line A{boost::xtime t;boost::xtime_get(&t, boost::TIME_UTC);AddMilliseconds(t, wait_time);cond.timed_wait(lock, t); // line B}return tmp;}private:LockFreeQueue queue;boost::condition cond;boost::mutex mtx;};Listing One
I used the timed_wait() instead of the simpler wait() to solve a possible deadlock when Produce() is called between line A and line B in Listing One. Then wait() will miss the notify_one()call and have to wait for the next produced element to wake up. If thiselement never comes (no more produced elements or if the Produce() call actually waits for Consume() to return), there's a deadlock. Call this approach TIME_WAIT.
The lock is still wait-free as long as there are elements in the queue. In this case, the Consumer() thread does no waiting and reads data as fast as possible (even with the Producer() that is inserting new elements). Only when the queue is exhausted does locking occur.
The Ping-Pong Test
To compare the three approaches (NAIVE_POLLING, SLEEP, and TIME_WAIT),I implemented a test called "Ping-Pong" that is similar to the game oftable tennis (the source code is available online). In Figure 2, thereare two identical queues between the threads T1 and T2.You first load one of the queues with a number of "balls," then ask eachthread to read from one queue and write to the other. The result is acontrolled infinite loop. By limiting the game to a fixed number ofreads/writes ("shots"), you get an understanding of how the queuebehaves when varying the waiting/sleep time and strategy and the numberof "balls." The faster the game, the better the performance. You shouldalso check CPU usage to see how much of it is used for real work.
"No ball" means "do nothing" (like two players waiting for the other to start). This gives you an idea of how good the queues are when there is no data—how nervous the players are. Ideally, CPU usage should be zero.
"One ball" is like the real ping-pong game: Each player shoots and waits for the other to shoot.
"Two (or more) balls" means both players could shoot at the same time, modulo collision and waiting issues.

Figure 2: The Ping-Pong test.
In a wait-free system, the more balls in the game, the better theperformance gain compared to the classic locking strategy. This isbecause wait-free is an optimistic concurrency control method (worksbest when there is no contention), while classical lock-basedconcurrency control is pessimistic (assumes contention happens andpreemptively inserts locking).
Ready to play? Here is the Ping-Pong test command line:
$> ./pingpong [strategy] [timeout] [balls] [shots]
When you run the program, the tests show the results in the table shown in Figure 3:
The best combination is the timed_wait() with a small wait time (1ms in the test for TIMED_WAIT). It has a very fast response time and almost 0 percent CPU usage when the queue is empty.
Even when the sleep time is 0 (usleep(0)), the worst seems to be the sleep() method, especially when the queue is likely to be empty. (The number of shots in this case is 100-times smaller than the other cases because of the long duration of the game.)
The NO_WAIT strategy is fast but behaves worst when there are no balls (100-percent CPU usage to do nothing). It has the best performance when there is a single ball.
[Click image to view at full size]

Figure 3: Ping-Pong test results.
Figure 4 presents a table with the results for a classic approach (see SafeQueue). These results show that this queue is, on average, more than four-times slower than the LockFreeQueue. The slowdown comes from the synchronization between threads. Both Produce() and Consume() have to wait for each other to finish. CPU usage is almost 100 percent for this test (similar to the NO_WAIT strategy, but not even close to its performance).
[Click image to view at full size]

Figure 4: Classic approach results.
Final Considerations
The single-threaded code below shows the value of the list.size() when Producing/ Consuming elements:
LockFreeQueue q; // list.size() == 1q.Produce(1); // list.size() == 2int i;q.Consume(i); // list.size() == still 2!;// Consume() doesn't modify the listq.Produce(i); // list.size() == 2 again;
The size of the queue is 1 if Produce() was never called and greater than 1 if any element was produced.
No matter how many times Consume() is called, the list's size will stay constant. It is Produce() that is increasing the size (by 1); and if there were consumed elements, it will also delete them from the queue. In a way, Produce()acts as a simple garbage collector. The whole thread safety comes fromthe fact that specific data is modified from single threads only. Thesynchronization between threads is done using iterators (or pointers,whichever has atomic read/write operation on your machine). Also consider this code:
usleep(1000); // sleep 1 microsecond
On the face of it, this line of code makes a thread sleep for 1microsecond, and then continue. In reality, 1 microsecond is just alower bound to the duration of the call.
The man page for usleep() says, "The usleep() functionsuspends execution of the calling process for (at least) usecmicroseconds. The sleep may be lengthened slightly by any systemactivity or by the time spent processing the call or by the granularityof system timers," or if you use the nanosleep() function. "Therefore, nanosleep()always pauses for at least the specified time; however, it can take upto 10 ms longer than specified until the process becomes runnableagain."
So if the process is not scheduled under a real-time policy, there's noguarantee when your thread will be running again. I've done some testsand (to my surprise) there are situations when code such as:
cond.timed_wait(lock, x); // x = e.g. 1 millisecond
will actually wait for more than 1 second.
Acknowledgments
Many thanks to Andrei Alexandrescu who took the time to review thisarticle. Also, thanks to Radu Duta for making useful corrections.