views:

312

answers:

6

A byte stream should be transferred and there is one producer thread and a consumer one. Speed of producer is higher than consumer most of the time, and I need enough buffered data for QoS of my application. I read about my problem and there are solutions like shared buffer, PipeStream .NET class ... This class is going to be instantiated many times on server so I need and optimized solution. Is it good idea to use a Queue of ByteArray ?

If yes, I'll use an optimization algorithm to guess the Queue size and each ByteArray capacity and theoretically it fits my case.

If no, I what's the best approach ?

Please let me know if there's a good lock free thread safe implementation of ByteArray Queue in C# or VB.

Thanks in advance

+1  A: 

In .NET 4 there is System.Collections.Concurrent.Queue<T> which is as lock free as these things can be (while still being general).

Richard
+1  A: 

Throttling is important here, by the sound of it, the BoundedBuffer class in this magazine article fits the bill. A similar class will be available in .NET 4.0 as the BlockingCollection class. Tuning the buffer size is still up to you.

Hans Passant
-1: Yes there is such a thing, unless you take the weird stance that atomic operations are 'locking'. In fact, if there's just one consumer, you can make a queue that won't even live-lock.
Strilanc
Useless commentary, post a real answer.
Hans Passant
@Hans: This time there is some truth in the comments. With a single producer, single consumer you can do that nicely without locks or interlocked with a circular buffer and a pair of volatiles (these are just compiler fences on x86/x64, st.rel and ld.acq on Itanic :P ). There is an unbounded example at Intel's site: http://software.intel.com/en-us/articles/single-producer-single-consumer-queue/ . I took a chance on a bounded C# one in my answer.
andras
+2  A: 

Dr. Dobbs implemented a lock-free queue in C++, which you could relatively easily adopt to C#. It works when there is exactly one producer (there can be any number of consumers).

The basic idea is to use a doubly-linked list as the underlying structure along with a movable head and tail reference. When an item is produced, it gets added to the end, and everything between the beginning of the list and the current "head" is removed. To consume, attempt to move the head up; if it hits the tail, fail, if it doesn't, succeed and return the new element. The particular order of operations makes it inherently thread-safe.

However, there are two major problems with using such a "lock-free" design here:

  1. There is no way to enforce an upper bound to the queue size, which might be a serious problem if your producer is faster than your consumer;

  2. By design, the Consume method must simply fail to retrieve an element if nothing has been produced. That means you need to implement your own locking for the consumer, and such locking is invariably either busy-waiting (which is much worse than locking in the performance spectrum) or timed waits (which slows down your consumer even further).

For these reasons, I'd recommend that you seriously consider whether or not you really need a lock-free structure. A lot of people come to this site thinking that it's going to be "faster" than an equivalent structure using locking, but the practical difference for most applications is so negligible that it's normally not worth the added complexity, and in some cases it can actually perform worse, because wait states (or alertable waits) are much cheaper than busy-waiting.

Multicore machines and the need for memory barriers make effective lock-free threading even more complicated; under normal operation you can still get out-of-order execution, and in .NET the jitter can further decide to reorder instructions, so you'd probably need to pepper the code with volatile variables and Thread.MemoryBarrier calls, which again might contribute toward making the lock-free version costlier than the basic synchronized version.

How about using a plain old synchronized producer-consumer queue first, and profiling your application to determine whether or not it can meet your performance requirements? There's a great, efficient P-C queue implementation over at Joseph Albahari's site. Or, as Richard mentions, if you are using the .NET 4.0 framework then you can simply use ConcurrentQueue or more likely BlockingCollection.

Test first - load test the synchronized queue, which is easy to implement - and watch how much time is actually spent locking. Not waiting, which you'd have to do anyway, but on actually acquiring and releasing the locks after they become signaled. If it's more than 1% of your program's execution time, I would be very surprised; but if so, then start looking at lock-free implementations - and make sure you profile those too, to make sure that they're actually performing better.

Aaronaught
You should move the links to the concurrent collections way up to the top of your post. They're available for .NET 3.5 in the System.Threading assembly included in this package: http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx
280Z28
@280Z28: Thank you! I am familiar with ConcurrentQueue<T> for 4.0, but have been needing the equivalent for a 3.5 project (hence running across this question). Reactive Extensions is just what I needed!
Brad
A: 

Julian M Bucknall has written one in C#.

gabr
+1  A: 

You will probably gain much more speedup if instead of producing and consuming byte-by-byte, you work in chunks. In that case, the "lock-freeness" of the code would probably not matter at all - in fact, the traditional, locking solution might be preferable. I'll try to demonstrate.

A lock-free, single producer, single consumer, bounded queue is given in C#. (Listing A)
There are no esoteric interlocked operations, even no explicit memory barriers. Let's say that, at first glance it is as fast and as lock-free as it gets. Isn't it?
Now let's compare it with a locking solution that Marc Gravell has given, here.

We will use a dual CPU machine that has no shared L3 cache between the cores. We expect at most 2x speedup. A 2x speedup would indeed mean that the lock-free solution performs ideally, at theoretic bounds.
In order to make an ideal environment for the lock-free code, we will even set the CPU affinity of the producer and the consumer thread, by using the utility class from here.
The resulting code of the test is in (Listing B).

It is producing ca. 10MBytes on one thread while consuming it on another.
The queue size is fixed at 32KBytes. If it is full, the producer waits.
A typical run of the test on my machine looks like this:

LockFreeByteQueue: 799ms
ByteQueue: 1843ms

The lock-free queue is faster. Wow, it is more than 2x as fast! That is something to brag about. :)
Let's look at what is happening. Marc's locking queue does just that. It locks. It does this for every byte.

Do we really need to lock for every byte and push the data byte by byte? It most assuredly arrives in chunks on the network (like some ca. 1k packets). Even if it really arrives byte by byte from an internal source, the producer could easily package it into nice chunks.
Let's just do that - instead of producing and consuming byte-by-byte, let's work in chunks and add two other tests to the micro-benchmark (Listing C, just insert it into the benchmark body).
Now a typical run looks like this:

LockFreePageQueue: 33ms
PageQueue: 25ms

Now, both of them are actually 20x faster than the original lock-free code - Marc's solution with the added chunking is actually faster now than the lock-free code with chunking!
Instead of going with a lock-free structure that would result in a 2x speedup, we attempted an another solution that works just fine with locking and resulted in a 20x(!) speedup.
The key to many problems is not so much avoiding locking - it is much more about avoiding sharing and minimizing locking. In the above case, we can avoid sharing for the duration of byte-copying.
We can work on a private structure for most of the time and then enqueue a single pointer, thereby shrinking shared space and time to a single insertion of a single pointer into a queue.

Listing A, a lock-free, single producer, single consumer queue:

public class BoundedSingleProducerSingleConsumerQueue<T>
{
    T[] queue;
    volatile int tail;
    volatile int head;

    public BoundedSingleProducerSingleConsumerQueue(int capacity)
    {
        queue = new T[capacity + 1];
        tail = head = 0;
    }

    public bool TryEnqueue(T item)
    {
        int newtail = (tail + 1) % queue.Length;
        if (newtail == head) return false;
        queue[tail] = item;
        tail = newtail;
        return true;
    }

    public bool TryDequeue(out T item)
    {
        item = default(T);
        if (head == tail) return false;
        item = queue[head];
        queue[head] = default(T);
        head = (head + 1) % queue.Length;
        return true;
    }
}

Listing B, a micro-benchmark:

class Program
{
    static void Main(string[] args)
    {
        for (int numtrials = 3; numtrials > 0; --numtrials)
        {
            using (ProcessorAffinity.BeginAffinity(0))
            {
                int pagesize = 1024 * 10;
                int numpages = 1024;
                int totalbytes = pagesize * numpages;

                BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
                Stopwatch sw = new Stopwatch();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
                        }
                    }
                });
                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp;
                    while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
                }
                sw.Stop();
                Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);


                SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
                sw.Reset();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            byteQueue.Enqueue((byte)(i & 0xFF));
                        }
                    }
                });

                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp = byteQueue.Dequeue();
                }
                sw.Stop();
                Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);

                Console.ReadKey();
            }
        }
    }
}

Listing C, chunked tests:

BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            while (!lockfreePageQueue.TryEnqueue(page)) ;
        }
    }
});
for (int i = 0; i < numpages; i++)
{
    byte[] page;
    while (!lockfreePageQueue.TryDequeue(out page)) ;
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);

SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);

ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            pageQueue.Enqueue(page);
        }
    }
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
    byte[] page = pageQueue.Dequeue();
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);
andras
Thanks Andras and all othersYour answer is perfect and suitable for my case. In fact I don't need byte-by-byte transfer as you mentioned. I can estimate size of 1 second feed for my consumer and use it as each ByteArray size and if I need 5 second cache for QoS so the queue size would be 5 (members). If consumers speed fluctuates then ByteArray size changes. This implementation makes sense and maps to my design where I need to have tight control over consuming RAM on the server :D
Xaqron
Sorry, I did it finally !Thanks again
Xaqron
A: 

Most important part is the design of the shared object. In my scenario reader and writer can use separate buffers (big data chunks) independently and then, only accessing a shared FIFO object like a queue should be synchronized. This way lock time is minimized and threads can complete the job in parallel. And with .NET framewok 4.0 implementation of this concept made easy :

There's a ConcurrentQueue(Of T) Class in System.Collections.Concurrent namespace and arrayByte is a good type to use as queue type for my scenario. There are other thread-safe collections in the namespace.

http://msdn.microsoft.com/en-us/library/system.collections.concurrent.aspx

Xaqron