You will probably gain much more speedup if instead of producing and consuming byte-by-byte, you work in chunks. In that case, the "lock-freeness" of the code would probably not matter at all - in fact, the traditional, locking solution might be preferable. I'll try to demonstrate.
A lock-free, single producer, single consumer, bounded queue is given in C#. (Listing A)
There are no esoteric interlocked operations, even no explicit memory barriers. Let's say that, at first glance it is as fast and as lock-free as it gets. Isn't it?
Now let's compare it with a locking solution that Marc Gravell has given, here.
We will use a dual CPU machine that has no shared L3 cache between the cores.
We expect at most 2x speedup. A 2x speedup would indeed mean that the lock-free solution performs ideally, at theoretic bounds.
In order to make an ideal environment for the lock-free code, we will even set the CPU affinity of the producer and the consumer thread, by using the utility class from here.
The resulting code of the test is in (Listing B).
It is producing ca. 10MBytes on one thread while consuming it on another.
The queue size is fixed at 32KBytes. If it is full, the producer waits.
A typical run of the test on my machine looks like this:
LockFreeByteQueue: 799ms
ByteQueue: 1843ms
The lock-free queue is faster. Wow, it is more than 2x as fast! That is something to brag about. :)
Let's look at what is happening.
Marc's locking queue does just that. It locks. It does this for every byte.
Do we really need to lock for every byte and push the data byte by byte? It most assuredly arrives in chunks on the network (like some ca. 1k packets). Even if it really arrives byte by byte from an internal source, the producer could easily package it into nice chunks.
Let's just do that - instead of producing and consuming byte-by-byte, let's work in chunks and add two other tests to the micro-benchmark (Listing C, just insert it into the benchmark body).
Now a typical run looks like this:
LockFreePageQueue: 33ms
PageQueue: 25ms
Now, both of them are actually 20x faster than the original lock-free code - Marc's solution with the added chunking is actually faster now than the lock-free code with chunking!
Instead of going with a lock-free structure that would result in a 2x speedup, we attempted an another solution that works just fine with locking and resulted in a 20x(!) speedup.
The key to many problems is not so much avoiding locking - it is much more about avoiding sharing and minimizing locking. In the above case, we can avoid sharing for the duration of byte-copying.
We can work on a private structure for most of the time and then enqueue a single pointer, thereby shrinking shared space and time to a single insertion of a single pointer into a queue.
Listing A, a lock-free, single producer, single consumer queue:
public class BoundedSingleProducerSingleConsumerQueue<T>
{
T[] queue;
volatile int tail;
volatile int head;
public BoundedSingleProducerSingleConsumerQueue(int capacity)
{
queue = new T[capacity + 1];
tail = head = 0;
}
public bool TryEnqueue(T item)
{
int newtail = (tail + 1) % queue.Length;
if (newtail == head) return false;
queue[tail] = item;
tail = newtail;
return true;
}
public bool TryDequeue(out T item)
{
item = default(T);
if (head == tail) return false;
item = queue[head];
queue[head] = default(T);
head = (head + 1) % queue.Length;
return true;
}
}
Listing B, a micro-benchmark:
class Program
{
static void Main(string[] args)
{
for (int numtrials = 3; numtrials > 0; --numtrials)
{
using (ProcessorAffinity.BeginAffinity(0))
{
int pagesize = 1024 * 10;
int numpages = 1024;
int totalbytes = pagesize * numpages;
BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
Stopwatch sw = new Stopwatch();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < totalbytes; i++)
{
while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
}
}
});
for (int i = 0; i < totalbytes; i++)
{
byte tmp;
while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
}
sw.Stop();
Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);
SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < totalbytes; i++)
{
byteQueue.Enqueue((byte)(i & 0xFF));
}
}
});
for (int i = 0; i < totalbytes; i++)
{
byte tmp = byteQueue.Dequeue();
}
sw.Stop();
Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);
Console.ReadKey();
}
}
}
}
Listing C, chunked tests:
BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < numpages; i++)
{
byte[] page = new byte[pagesize];
for (int j = 0; j < pagesize; j++)
{
page[j] = (byte)(i & 0xFF);
}
while (!lockfreePageQueue.TryEnqueue(page)) ;
}
}
});
for (int i = 0; i < numpages; i++)
{
byte[] page;
while (!lockfreePageQueue.TryDequeue(out page)) ;
for (int j = 0; j < pagesize; j++)
{
byte tmp = page[j];
}
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);
SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);
ThreadPool.QueueUserWorkItem(delegate(object state)
{
using (ProcessorAffinity.BeginAffinity(1))
{
for (int i = 0; i < numpages; i++)
{
byte[] page = new byte[pagesize];
for (int j = 0; j < pagesize; j++)
{
page[j] = (byte)(i & 0xFF);
}
pageQueue.Enqueue(page);
}
}
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
byte[] page = pageQueue.Dequeue();
for (int j = 0; j < pagesize; j++)
{
byte tmp = page[j];
}
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);