Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer
Nowadays, high-performance server software (for example, the HTTP accelerator) in most cases runs on multicore machines. Modern hardware could provide 32, 64 or more CPU cores. In such highly concurrent environments, lock contention sometimes hurts overall system performance more than data copying, context switches and so on. Thus, moving the hottest data structures from a locked to a lock-free design can improve software performance in multicore environments significantly.
One of the hottest data structures in traditional server software is the work queue, which could have hundreds of thousands of push and pop operations per second from tens of producers and/or consumers.
The work queue is a FIFO data structure that has only two operations: push() and pop(). It usually limits its size such that pop() waits if there are no elements in the queue, and push() waits if the queue contains the maximum allowed number of elements. It is important that many threads can execute pop() and push() operations simultaneously on different CPU cores.
One of the possible work queue implementations is a ring buffer for storing pointers to the queued elements. It has good performance especially in comparison with the common non-intrusive linked list (which stores copies of values passed by the user, such as std::list). The significant thing about the ring buffer implementation is that it natively limits its size—you can only move the current position in a round-robin fashion. On the other hand, linked lists require maintaining an additional field for total queue length. With linked lists, push and pop operations have to modify the queue length in addition to element links updating, so you need to take more care with consistency in the queue for a lock-free implementation.
Basically, different CPU families provide different guarantees for ordering memory operations, and this is critical for lock-free algorithms. In this article, I concentrate on x86, as it is the most widespread architecture rather than write generic (but slower) code.
Naive Synchronized QueueFirst, let's define the interface for our queue (I use C++11 in this article):
template<class T, long Q_SIZE>
class NaiveQueue {
public:
NaiveQueue();
void push(T *x);
T *pop();
};
The queue will store T* pointers and has a maximum size of Q_SIZE.
Let's see how the queue would look in a naive locked implementation. To develop the queue, we need an array in which we place our ring buffer. We can define this as:
T *ptr_array_[Q_SIZE];
Two members of the class, head_ and tail_, will point to the head (the next position to push an element) and tail (the next item to pop) of the queue and should be initialized to zero in the class construction. We can simplify our operations on the ring buffer by defining the counters as an unsigned long. An unsigned long (which is 64-bit in length) is large enough to handle more than millions of operations per second for thousands of years. So tail_ and head_ will be defined as:
unsigned long head_;
unsigned long tail_;
This way, we can access the elements (the same for head_ and tail_) just by the following:
ptr_array_[tail_++ & Q_MASK]
where Q_MASK is defined as:
static const unsigned long Q_MASK = Q_SIZE - 1;
To get the current position in the array, we can calculate a remainder of integer division of tail_ by Q_SIZE, but rather we define Q_SIZE as a power of 2 (32768 in our case), so we can use bitwise AND between Q_MASK and tail_, which is bit faster.
Because the operations in the queue must wait if there are no elements or if the queue is full, we need two condition variables:
std::condition_variable cond_empty_;
std::condition_variable cond_overflow_;
to wait on some new elements in the queue or for some free space, respectively. Surely, we need a mutex to serialize our queue:
std::mutex mtx_;
This way, we can write push() and pop() in the following manner:
void push(T *x)
{
std::unique_lock<std::mutex> lock(mtx_);
cond_overflow_.wait(lock, [&head_, &tail_]() {
return tail_ + Q_SIZE > head_;
});
ptr_array_[head_++ & Q_MASK] = x;
cond_empty_.notify_one();
}
T *pop()
{
std::unique_lock<std::mutex> lock(mtx_);
cond_empty_.wait(lock, [&head_, &tail_]() {
return tail_ < head_;
});
T *x = ptr_array_[tail_++ & Q_MASK];
cond_overflow_.notify_one();
return x;
}
]]>
We perform both of the operations under an acquired exclusive lock using mtx_. When
the lock is acquired, we can check the current queue state: whether it is empty (and
we cannot pop any new element) or full (and we cannot push a new element).
std::condition_variable::wait()
moves the current
thread to the sleep state until
the specified predicate is true. Next, we push or pop an element and notify
the other thread (with the notify_one()
call) that we have changed the queue state.
Because we add or delete only one element at a time, only one thread
waiting
for available elements or free slots in the queue can make progress, so we
notify and wake up only one thread.
The problem with the implementation is that only one thread at single point in time can modify the queue. Moreover, mutexes and condition variables are expensive—in Linux, they are implemented by the futex(2) system call. So each time a thread needs to wait on a mutex or condition variable, that leads to a call to futex(2), which re-schedules the thread and moves it to the wait queue.
Now, let's run a basic test that just pushes and pops addresses to and from the queue in 16 producers and 16 consumers (there is a link at the end of article to the full source code). On a box with 16 Xeon cores, the test took about seven minutes:
# time ./a.out
real 6m59.219s
user 6m21.515s
sys 72m34.177s
And, strace
with the -c
and
-f
options shows that the program spends 99.98% of
the time
in the futex system call.
Hopefully, you do not have to ask the kernel for help with user-space thread synchronization. The CPU (at least in the most common architectures) provides atomic memory operations and barriers. With the operations, you can atomically:
-
Read the memory operand, modify it and write it back.
-
Read the memory operand, compare it with a value and swap it with the other value.
Memory barriers are special assembly instructions also known as fences. Fences guarantee an instruction's execution order on the local CPU and visibility order on other CPUs. Let's consider two independent data instructions, A and B, separated by fence (let's use mfence, which provides a guarantee for ordering read and write operations):
A
mfence
B
The fence guarantees that:
-
Compiler optimizations won't move A after the fence or B before the fence.
-
The CPU will execute A and B instructions in order (it normally executes instructions out of order).
-
Other CPU cores and processor packages, which work on the same bus, will see the result of instruction A before the result of instruction B.
For our queue, we need to synchronize multiple threads' access to the head_ and
tail_ fields. Actually, when you run head_++
(this
is an example of an RMW,
Read-Modify-Write, operation because the processor must read the current head_ value,
increment it locally and write it back to memory) on two cores, both
cores
could read the current head_ value simultaneously, increment it and
write the new value back simultaneously, so one increment is lost. For atomic operations,
C++11 provides the std::atomic template, which should replace the current GCC sync_
intrinsics in the future. Unfortunately, for my compiler (GCC 4.6.3 for
x86-64),
std::atomic<>
methods still generate extra fences independently on specified memory
order. So, I'll the use old GCC's intrinsics for atomic operations.
We can atomically read and increment a variable (for example, our head_) by:
__sync_fetch_and_add(&head_, 1);
This makes the CPU lock the shared memory location on which it's going to do an operation (increment, in our case). In a multiprocessor environment, processors communicate to each other to ensure that they all see the relevant data. This is known as the cache coherency protocol. By this protocol, a processor can take exclusive ownership on a memory location. However, these communications are not for free, and we should use such atomic operations carefully and only when we really need them. Otherwise, we can hurt performance significantly.
Meanwhile, plain read and write operations on memory locations execute
atomically and do not require any additional actions (like specifying the
lock
prefix to make the instruction run atomically on x86 architecture).
In our lock-free implementation, we're going to abandon the mutex mtx_ and consequently both the condition variables. However, we still need to wait if the queue is full on push and if the queue is empty on pop operations. For push, we would do this with a simple loop like we did for the locked queue:
while (tail_ + Q_SIZE < head_)
sched_yield();
sched_yield()
just lets the other thread run on the current processor. This is
the native way and the fastest way to re-schedule the current thread. However, if there is no
other thread waiting in the scheduler run queue for available CPU,
the current thread will be scheduled back immediately. Thus, we'll always see 100%
CPU usage, even if we have no data to process. To cope with this, we can use
usleep(3) with some small value.
Let's look more closely at what's going on in the loop. First, we read the tail_ value; next we read the value of head_, and after that, we make the decision whether to wait or push an element and move head_ forward. The current thread can schedule at any place during the check and even after the check. Let's consider the two-thread scenario (Table 1).
Thread 1 | Thread 2 |
read tail_ | read tail_ |
read head_ | read head_ |
(scheduled) | push an element |
push an element |
If we have only one free place in the ring buffer, we override the pointer to the oldest queued element. We can solve the problem by incrementing the shared head_ before the loop and use a temporal local variable (that is, we reserve a place to which we're going to insert an element and wait when it is free):
unsigned long tmp_head =
__sync_fetch_and_add(&head_, 1);
while (tail_ + Q_SIZE < tmp_head)
sched_yield();
ptr_array_[tmp_head & Q_MASK] = x;
We can write similar code for pop()—just swap head and tail. However, the problem still exists. Two producers can increment head_, check that they have enough space and re-schedule at the same time just before inserting x. A consumer can wake up instantly (it sees that head_ moved forward to two positions) and read a value from the queue that was not inserted yet.
Before solving the issue, let's consider the following example, where we have two producers (P1 and P2) and two consumers (C1 and C2):
LT LH
| _ | _ | _ | x | x | x | x | x | x | x | _ | _ | _ |
^ ^ ^ ^
| | | |
C1 C2 P1 P2
In this example, "_" denotes free slots and "x" denotes inserted elements. C1 and C2 are going to read values, and P1 and P2 are going to write an element to currently free slots. Let LT be the latest (lowest) tail value among all the consumers, which is stored in tmp_tail of the latest consumer, C1 above. Consumer C1 currently can work on the queue at the LT position (that is, it is in the middle of fetching the element). And, let LH correspondingly be the lowest value of tmp_head among all the producers. At each given time, we cannot push an element to a position equal to or greater than LT, and we should not try to pop an element at a position equal to or greater than LH. This means all the producers should care about the current LT value, and all consumers should care about the current LH value. So, let's introduce the two helping class members for LH and LT:
volatile unsigned long last_head_;
volatile unsigned long last_tail_;
Thus, we should check for the last_tail_ value instead of tail_ in the loop above. We need to update the values from multiple threads, but we're going to do this via plain write operations, without RMW, so the members do not have to be of the atomic type. I just specified the variables as volatile to prevent their values from caching in local processor registers.
Now the question is who should update the last_head_ and last_tail_ values, and when. We do expect that in most cases, we are able to perform push and/or pop operations on the queue without a wait. Thus, we can update the two helping variables only when we really need them—that is, inside the waiting loop. So when a producer realizes that it cannot insert a new element because of a too-small last_tail_ value, it falls into the wait loop and tries to update the last_tail_ value. To update the value, the thread must inspect the current tmp_tail of each consumer. So we need to make the temporal value visible to other threads. One possible solution is to maintain an array of tmp_tail and tmp_head values with the size equal to the number of running threads. We can do this with the following code:
struct ThrPos {
volatile unsigned long head, tail;
};
ThrPos thr_p_[std::max(n_consumers_, n_producers_)];
where n_consumers_
is the number of consumers, and
n_producers_
is the number
of
producers. We can allocate the array dynamically, but leave it statically
sized
for simplicity for now. Many threads read the elements of the array, but only
one
thread with a plain move instruction (no RMW operation) can update them,
so we also can use regular reads on the variables.
Because thr_p_ values are used only to limit moving of the current queue pointers, we initialize them to the maximum allowed values—that is, we do not limit head_ and tail_ movements until somebody pushes or pops into the queue.
We can find the lowest tail values for all the consumers with the following loop:
auto min = tail_;
for (size_t i = 0; i < n_consumers_; ++i) {
auto tmp_t = thr_p_[i].tail;
asm volatile("" ::: "memory"); // compiler barrier
if (tmp_t < min)
min = tmp_t;
1}
The temporal variable tmp_t
is required here, because we cannot atomically
compare whether thr_p_[i].tail
is less than min and update min if it is. When
we remember the current consumer's tail and compare it with min, the consumer can
move the tail. It can move it only forward, so the check in the while
condition is still correct, and we won't overwrite some live queue elements.
But, if we don't use tmp_t
, we write the code like this:
if (thr_p_[i].tail < min)
min = thr_p_[i].tail;
Then the consumer can have a lower tail value while we're comparing it with min, but moves it far forward after the comparison is done and just before the assignment. So we probably will find an incorrect minimal value.
I added the compiler barrier asm volatile("" :::
"memory)
—this is a GCC-specific compiler barrier—to
make sure that the compiler won't move thr_p_[i].tail
access and will access the memory location only once—to load its value to
tmp_t
.
One important thing about the array is that it must be indexed by the current
thread
identifier. Because POSIX threads (and consequently the C++ threads that use
them) do not use small monotonically increasing values for identifying
threads,
we need to use our own thread wrapping. I'll use the inline
thr_pos()
method
of
the queue to access the array elements:
ThrPos& thr_pos() const
{
return thr_p_[ThrId()];
}
You can find an example of the ThrId()
implementation in the source referenced at
the end of the article.
Before writing the final implementation of push() and pop(), let's go back to the initial application of our queue, the work queue. Usually, producers and consumers do a lot of work between operations with the queue. For instance, it could be a very slow IO operation. So, what happens if one consumer fetches an element from the queue and goes to sleep in the long IO operation? Its tail value will stay the same for a long time, and all the producers will wait on it over all the other consumers fully cleared the queue. This is not desired behavior.
Let's fix this in two steps. First, let's assign to the per-thread tail pointer the maximum allowed value just after fetching the element. So, we should write the following at the end of the pop() method:
T *ret = ptr_array_[thr_pos().tail & Q_MASK];
thr_pos().tail = ULONG_MAX;
return ret;
Because a producer in push() starts to find the minimal allowed value for the last_tail_ from the current value of the global tail_, it can assign the current tail_ value to last_tail_ only if there are no active consumers (this is what we want).
Generally speaking, other processors can see
thr_pos().tail
update before
the local processor reads from ptr_array_, so they can move and overwrite the
position in the array before the local processor reads it. This is possible on
processors with relaxed memory operation ordering. However, x86 provides
relatively strict memory ordering rules—particularly, it guarantees that
1) stores are not reordered with earlier loads and
2) stores are seen in consistent order by other processors.
Thus, loading from ptr_array_
and storing to
thr_pos().tail
in the code above
will be done on x86 and seen by all processors in exactly this order.
The second step is to set thr_pos().tail correctly at the beginning of pop(). We assign the current thr_pos().tail with:
thr_pos().tail = __sync_fetch_and_add(&tail_, 1);
The problem is that the operation is atomic only for tail_ shift, but not for
the thr_pos().tail assignment. So there is a time window in which
thr_pos().tail = ULONG_MAX
, and tail_ could be shifted significantly by other
consumers, so push() will set last_tail_ to the current, just incremented tail_.
So when we're are going to pop an element, we have to reserve a tail position
less than or equal to the tail_ value from which we'll pop an element:
thr_pos().tail = tail_;
thr_pos().tail = __sync_fetch_and_add(&tail_, 1);
In this code, we actually perform the following three operations:
-
Write tail_ to thr_pos().tail.
-
Increment tail_.
-
Write the previous value of tail_ to thr_pos().tail.
Again, in this general case, we have no guarantee that other processors will
"see" the results of the write operations in the same order.
Potentially, some other processor can read the incremented tail_ value first,
try to find the new last_tail_,
and only after that read the new current thread tail value. However,
__sync_fetch_and_add()
executes locked instruction,
which implies an implicit full
memory barrier on most architectures (including x86), so neither the first nor
third operations can be moved over the second one. Therefore, we also can skip
explicit memory barriers here.
Thus, if the queue is almost full, all producers will stop at or before the position of element that we're popping.
Now let's write our final implementation of the push() and pop() methods:
void push(T *ptr)
{
thr_pos().head = head_;
thr_pos().head = __sync_fetch_and_add(&head_, 1);
while (__builtin_expect(thr_pos().head >=
last_tail_ + Q_SIZE, 0))
{
::sched_yield();
auto min = tail_;
for (size_t i = 0; i < n_consumers_; ++i) {
auto tmp_t = thr_p_[i].tail;
asm volatile("" ::: "memory"); // compiler barrier
if (tmp_t < min)
min = tmp_t;
}
last_tail_ = min;
}
ptr_array_[thr_pos().head & Q_MASK] = ptr;
thr_pos().head = ULONG_MAX;
}
T *pop()
{
thr_pos().tail = tail_;
thr_pos().tail = __sync_fetch_and_add(&tail_, 1);
while (__builtin_expect(thr_pos().tail >=
last_head_, 0))
{
::sched_yield();
auto min = head_;
for (size_t i = 0; i < n_producers_; ++i) {
auto tmp_h = thr_p_[i].head;
asm volatile("" ::: "memory"); // compiler barrier
if (tmp_h < min)
min = tmp_h;
}
last_head_ = min;
}
T *ret = ptr_array_[thr_pos().tail & Q_MASK];
thr_pos().tail = ULONG_MAX;
return ret;
}
Careful readers will notice that multiple threads can scan the current head or tail values over all the producing or consuming threads. So a number of threads can find different min values and try to write them to last_head_ or last_tail_ simultaneously, so we probably would use a CAS operation here. However, atomic CAS is expensive, and the worst that can happen is that we assign too small of a value to last_head_ or last_tail_. Or, if we ever overwrite a new higher value with a smaller older value, we'll fall into sched_yield() again. Maybe we will fall to sched_yield() more frequently than if we use the synchronized CAS operation, but in practice, the cost of extra atomic operation reduces performance.
Also, I used __builtin_expect with the zero expect argument to say that we do not expect that the condition in the while statement will become true too frequently and the compiler should move the inner loop code after the code executed if the condition is false. This way, we can improve the instruction cache usage.
Finally, let's run the same test as for the naive queue:
# time ./a.out
real 1m53.566s
user 27m55.784s
sys 2m4.461s
This is 3.7 times faster than our naive queue implementation!
ConclusionNowadays, high-performance computing typically is achieved in two ways: horizontal scaling (scale-out) by adding new computational nodes and vertical scaling (scale-up) by adding extra computational resources (like CPUs or memory) to a single node. Unfortunately, linear scaling is possible only in theory. In practice, if you double your computational resources, it is likely that you get only a 30–60% performance gain. Lock contention is one of the problems that prevents efficient scale-up by adding extra CPUs. Lock-free algorithms make scale-up more productive and allow you to get more performance from multicore environments.
The code for naive and lock-free queue implementations with the tests for correctness is available at https://github.com/krizhanovsky/NatSys-Lab/blob/master/lockfree_rb_q.cc.
AcknowledgementSpecial thanks to Johann George for the final review of this article.