views:

121

answers:

3

Hello, I'm working on an academic open source project and now I need to create a fast blocking FIFO queue in C#. My first implementation simply wrapped a synchronized queue (w/dynamic expansion) within a reader's semaphore, then I decided to re-implement in the following (theorically faster) way

public class FastFifoQueue<T>
{
    private T[] _array;
    private int _head, _tail, _count;
    private readonly int _capacity;
    private readonly Semaphore _readSema, _writeSema;

    /// <summary>
    /// Initializes FastFifoQueue with the specified capacity
    /// </summary>
    /// <param name="size">Maximum number of elements to store</param>
    public FastFifoQueue(int size)
    {
        //Check if size is power of 2
        //Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
        if ((size & (size - 1)) != 0)
            throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");

        _capacity = size;
        _array = new T[size];
        _count = 0;
        _head = int.MinValue; //0 is the same!
        _tail = int.MinValue;

        _readSema = new Semaphore(0, _capacity);
        _writeSema = new Semaphore(_capacity, _capacity);
    }

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        Interlocked.Exchange(ref _array[index], item);
        Interlocked.Increment(ref _count);
        _readSema.Release();
    }

    public T Dequeue()
    {
        _readSema.WaitOne();
        int index = Interlocked.Increment(ref _tail);
        index %= _capacity;
        if (index < 0) index += _capacity;
        T ret = Interlocked.Exchange(ref _array[index], null);
        Interlocked.Decrement(ref _count);
        _writeSema.Release();

        return ret;
    }

    public int Count
    {
        get
        {
            return _count;
        }
    }
}

This is the classic FIFO queue implementation with static array we find on textbooks. It is designed to atomically increment pointers, and since I can't make the pointer go back to zero when reached (capacity-1), I compute modulo apart. In theory, using Interlocked is the same as locking before doing the increment, and since there are semaphores, multiple producers/consumers may enter the queue but only one at a time is able to modify the queue pointers. First, because Interlocked.Increment first increments, then returns, I already understand that I am limited to use the post-increment value and start store items from position 1 in the array. It's not a problem, I'll go back to 0 when I reach a certain value

What's the problem with it? You wouldn't believe that, running on heavy loads, sometimes the queue returns a NULL value. I am SURE, repeat, I AM SURE, that no method enqueues null into the queue. This is definitely true because I tried to put a null check in Enqueue to be sure, and no error was thrown. I created a test case for that with Visual Studio (by the way, I use a dual core CPU like maaaaaaaany people)

    private int _errors;

    [TestMethod()]
    public void ConcurrencyTest()
    {
        const int size = 3; //Perform more tests changing it
        _errors = 0;
        IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
        Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
        Thread[] producers = new Thread[size], consumers = new Thread[size];

        for (int i = 0; i < size; i++)
        {
            producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
            consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
            producers[i].Start(queue);
            consumers[i].Start(queue);
        }

        Thread.Sleep(new TimeSpan(0, 0, 1, 0));

        for (int i = 0; i < size; i++)
        {
            producers[i].Abort();
            consumers[i].Abort();
        }

        Assert.AreEqual(0, _errors);
    }

    private void LoopProducer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                try
                {
                    q.Enqueue(new object());
                }
                catch
                { }

            }
        }
        catch (ThreadAbortException)
        { }
    }

    private void LoopConsumer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                object item = q.Dequeue();
                if (item == null) Interlocked.Increment(ref _errors);
            }
        }
        catch (ThreadAbortException)
        { }

    }

Once a null is got by the consumer thread, an error is counted. When performing the test with 1 producer and 1 consumer, it succeeds. When performing the test with 2 producers and 2 consumers, or more, a disaster happens: even 2000 leaks are detected. I found that the problem can be in the Enqueue method. By design contract, a producer can write only into a cell that is empty (null), but modifying my code with some diagnostics I found that sometimes a producer is trying to write on a non-empty cell, which is then occupied by "good" data.

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        T leak = Interlocked.Exchange(ref _array[index], item);

        //Diagnostic code
        if (leak != null)
        {
            throw new InvalidOperationException("Too bad...");
        }
        Interlocked.Increment(ref _count);

        _readSema.Release();
    }

The "too bad" exception happens then often. But it's too strange that a conflict raises from concurrent writes, because increments are atomic and writer's semaphore allows only as many writers as the free array cells.

Can somebody help me with that? I would really appreciate if you share your skills and experience with me.

Thank you.

+4  A: 

I must say, this struck me as a very clever idea, and I thought about it for a while before I started to realize where (I think) the bug is here. So, on one hand, kudos on coming up with such a clever design! But, at the same time, shame on you for demonstrating "Kernighan's Law":

Debugging is twice as hard as writing the code in the first place. Therefore, if you write the code as cleverly as possible, you are, by definition, not smart enough to debug it.

The issue is basically this: you are assuming that the WaitOne and Release calls effectively serialize all of your Enqueue and Dequeue operations; but that isn't quite what is going on here. Remember that the Semaphore class is used to restrict the number of threads accessing a resource, not to ensure a particular order of events. What happens between each WaitOne and Release is not guaranteed to occur in the same "thread-order" as the WaitOne and Release calls themselves.

This is tricky to explain in words, so let me try to provide a visual illustration.

Let's say your queue has a capacity of 8 and looks like this (let 0 represent null and x represent an object):

[ x x x x x x x x ]

So Enqueue has been called 8 times and the queue is full. Therefore your _writeSema semaphore will block on WaitOne, and your _readSema semaphore will return immediately on WaitOne.

Now let's suppose Dequeue is called more or less concurrently on 3 different threads. Let's call these T1, T2, and T3.

Before proceeding let me apply some labels to your Dequeue implementation, for reference:

public T Dequeue()
{
    _readSema.WaitOne();                                   // A
    int index = Interlocked.Increment(ref _tail);          // B
    index %= _capacity;
    if (index < 0) index += _capacity;
    T ret = Interlocked.Exchange(ref _array[index], null); // C
    Interlocked.Decrement(ref _count);
    _writeSema.Release();                                  // D

    return ret;
}

OK, so T1, T2, and T3 have all gotten past point A. Then for simplicity let's suppose they each reach line B "in order", so that T1 has an index of 0, T2 has an index of 1, and T3 has an index of 2.

So far so good. But here's the gotcha: there is no guarantee that from here, T1, T2, and T3 are going to get to line D in any specified order. Suppose T3 actually gets ahead of T1 and T2, moving past line C (and thus setting _array[2] to null) and all the way to line D.

After this point, _writeSema will be signaled, meaning you have one slot available in your queue to write to, right? But your queue now looks like this!

[ x x 0 x x x x x ]

So if another thread has come along in the meantime with a call to Enqueue, it will actually get past _writeSema.WaitOne, increment _head, and get an index of 0, even though slot 0 is not empty. The result of this will be that the item in slot 0 could actually be overwritten, before T1 (remember him?) reads it.

To understand where your null values are coming from, you need only to visualize the reverse of the process I just described. That is, suppose your queue looks like this:

[ 0 0 0 0 0 0 0 0 ]

Three threads, T1, T2, and T3, all call Enqueue nearly simultaneously. T3 increments _head last but inserts its item (at _array[2]) and calls _readSema.Release first, resulting in a signaled _readSema but a queue looking like:

[ 0 0 x 0 0 0 0 0 ]

So if another thread has come along in the meantime with a call to Dequeue (before T1 and T2 are finished doing their thing), it will get past _readSema.WaitOne, increment _tail, and get an index of 0, even though slot 0 is empty.

So there's your problem. As for a solution, I don't have any suggestions at the moment. Give me some time to think it over... (I'm posting this answer now because it's fresh in my mind and I feel it might help you.)

Dan Tao
so with this design, no readers can be allowed to proceed while any writers are in 'enqueue' and no writers can be a allowed to proceed while any readers are in 'dequeue'.
Les
Thano you Dan!!! Kudos and kudos to you for having made me realize the terrible design flaw in my code! Now I'm thinking about it... the important thing is that semaphores must be released in the order threads increment the pointers, but there is no costraint on write order. However, I must remember that T[] (when T is a class) holds a **reference** for each cell, ie. a pointer. On all the platforms, copying a pointer is fast as an atomic operation. So I believe locking the whole method once rather than using complex mutex schemas will perform better. If you have more ideas, here I am ;)
djechelon
+2  A: 

(+1 to Dan Tao who I vote has the answer) The enqueue would be changed to something like this...

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
    ;

The dequeue would be changed to something like this...

while( (ret = Interlocked.Exchange(ref _array[index], null)) == null)
    ;

This builds upon Dan Tao's excellent analysis. Because the indexes are atomically obtained, then (assuming the no threads die or terminate in the enqueue or dequeue methods) a reader is guaranteed to eventually have his cell filled in, or the writer is guaranteed to eventually have his cell freed (null).

Les
A: 

Thank you Dan Tao and Les,

I really appreciated your help a lot. Dan, you opened my mind: it's not important how many producers/consumers are inside the critical section, the important is that the locks are released in order. Les, you found the solution to the problem.

Now it's time to finally answer my own question with the final code I made thanks to the help of both of you. Well, it's not much but it's a little enhancement from Les's code

Enqueue:

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
            Thread.Sleep(0);

Dequeue:

while ((ret = Interlocked.Exchange(ref _array[index], null)) == null)
            Thread.Sleep(0);

Why Thread.Sleep(0)? As you know, atomic operations are very fast because of their optimization. When we have such a Queue where T is a reference, references are just pointers that can be atomically read/stored with up to a single CPU instructions. I don't have sophisticated tools to find by myself, but I believe Les's code may actually lead to a thread trying to monopolize CPU in the while loop until system timer says it's time for context switch. So what? When I discover I can't write a cell because it's not empty, force context switch with a code snippet found on http://progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html

I also tested the code of the previous test case to get proof of my claims:

without sleep(0)

Read 6164150 elements
Wrote 6322541 elements
Read 5885192 elements
Wrote 5785144 elements
Wrote 6439924 elements
Read 6497471 elements

with sleep(0)

Wrote 7135907 elements
Read 6361996 elements
Wrote 6761158 elements
Read 6203202 elements
Wrote 5257581 elements
Read 6587568 elements

I know this is not a "great" discover and I will wiln no Turing prize for these numbers. Performance increment is not substantial. Forcing context switch allows more RW operations to be performed (*to be clear: in my test, I evaluate the performance of the queue, not simulate a producer/consumer problem, so don't care if at the end of the test after a minute there are still elements in queue). But I just demonstrated my approach works, thanks to you all.

Code available open source as MS-RL: http://logbus-ng.svn.sourceforge.net/viewvc/logbus-ng/trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs?revision=461&amp;view=markup

djechelon