views:

1289

answers:

5

My question is, is the class included below for a single-reader single-writer queue class thread-safe? This kind of queue is called lock-free, even if it will block if the queue is filled. The data structure was inspired by Marc Gravell's implementation of a blocking queue here at StackOverflow.

The point of the structure is to allow a single thread to write data to the buffer, and another thread to read data. All of this needs to happen as quickly as possible.

A similar data structure is described in an article at DDJ by Herb Sutter, except the implementation is in C++. Another difference is that I use a vanilla linked list, I use a linked list of arrays.

Rather than just including a snippet of code I include the whole thing with comment with a permissive open source license (MIT License 1.0) in case anyone finds it useful, and wants to use it (as-is or modified).

This is related to other questions asked on Stack Overflow of how to create a blocking concurrent queues (see Creating a blockinq Queue in .NET and Thread-safe blocking queue implementation in .NET).

Here is the code:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace CollectionSandbox
{
    /// This is a single reader / singler writer buffered queue implemented
    /// with (almost) no locks. This implementation will block only if filled 
    /// up. The implementation is a linked-list of arrays.
    /// It was inspired by the desire to create a non-blocking version 
    /// of the blocking queue implementation in C# by Marc Gravell
    /// http://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
    class SimpleSharedQueue<T> : IStreamBuffer<T>
    {
        /// Used to signal things are no longer full
        ManualResetEvent canWrite = new ManualResetEvent(true);

        /// This is the size of a buffer 
        const int BUFFER_SIZE = 512;

        /// This is the maximum number of nodes. 
        const int MAX_NODE_COUNT = 100;

        /// This marks the location to write new data to.
        Cursor adder;

        /// This marks the location to read new data from.
        Cursor remover;

        /// Indicates that no more data is going to be written to the node.
        public bool completed = false;

        /// A node is an array of data items, a pointer to the next item,
        /// and in index of the number of occupied items 
        class Node
        {
            /// Where the data is stored.
            public T[] data = new T[BUFFER_SIZE];

            /// The number of data items currently stored in the node.
            public Node next;

            /// The number of data items currently stored in the node.
            public int count;

            /// Default constructor, only used for first node.
            public Node()
            {
                count = 0;
            }

            /// Only ever called by the writer to add new Nodes to the scene
            public Node(T x, Node prev)
            {
                data[0] = x;
                count = 1;

                // The previous node has to be safely updated to point to this node.
                // A reader could looking at the point, while we set it, so this should be 
                // atomic.
                Interlocked.Exchange(ref prev.next, this);
            }
        }

        /// This is used to point to a location within a single node, and can perform 
        /// reads or writers. One cursor will only ever read, and another cursor will only
        /// ever write.
        class Cursor
        {
            /// Points to the parent Queue
            public SimpleSharedQueue<T> q;

            /// The current node
            public Node node;

            /// For a writer, this points to the position that the next item will be written to.
            /// For a reader, this points to the position that the next item will be read from.
            public int current = 0;

            /// Creates a new cursor, pointing to the node
            public Cursor(SimpleSharedQueue<T> q, Node node)
            {
                this.q = q;
                this.node = node;
            }

            /// Used to push more data onto the queue
            public void Write(T x)
            {
                Trace.Assert(current == node.count);

                // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                if (current == BUFFER_SIZE)
                {
                    // Check if the queue is full
                    if (q.IsFull())
                    {
                        // Signal the canWrite event to false
                        q.canWrite.Reset();

                        // Wait until the canWrite event is signaled 
                        q.canWrite.WaitOne();
                    }

                    // create a new node
                    node = new Node(x, node);
                    current = 1;
                }
                else
                {
                    // If the implementation is correct then the reader will never try to access this 
                    // array location while we set it. This is because of the invariant that 
                    // if reader and writer are at the same node: 
                    //    reader.current < node.count 
                    // and 
                    //    writer.current = node.count 
                    node.data[current++] = x;

                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
                }
            }

            /// Pulls data from the queue, returns false only if 
            /// there 
            public bool Read(ref T x)
            {
                while (true)
                {
                    if (current < node.count)
                    {
                        x = node.data[current++];
                        return true;
                    }
                    else if ((current == BUFFER_SIZE) && (node.next != null))
                    {
                        // Move the current node to the next one.
                        // We know it is safe to do so.
                        // The old node will have no more references to it it 
                        // and will be deleted by the garbage collector.
                        node = node.next;

                        // If there is a writer thread waiting on the Queue,
                        // then release it.
                        // Conceptually there is a "if (q.IsFull)", but we can't place it 
                        // because that would lead to a Race condition.
                        q.canWrite.Set();

                        // point to the first spot                
                        current = 0;

                        // One of the invariants is that every node created after the first,
                        // will have at least one item. So the following call is safe
                        x = node.data[current++];
                        return true;
                    }

                    // If we get here, we have read the most recently added data.
                    // We then check to see if the writer has finished producing data.
                    if (q.completed)
                        return false;

                    // If we get here there is no data waiting, and no flagging of the completed thread.
                    // Wait a millisecond. The system will also context switch. 
                    // This will allow the writing thread some additional resources to pump out 
                    // more data (especially if it iself is multithreaded)
                    Thread.Sleep(1);
                }
            }
        }

        /// Returns the number of nodes currently used.
        private int NodeCount
        {
            get
            {
                int result = 0;
                Node cur = null;
                Interlocked.Exchange<Node>(ref cur, remover.node);

                // Counts all nodes from the remover to the adder
                // Not efficient, but this is not called often. 
                while (cur != null)
                {
                    ++result;
                    Interlocked.Exchange<Node>(ref cur, cur.next);
                }
                return result;
            }
        }

        /// Construct the queue.
        public SimpleSharedQueue()
        {
            Node root = new Node();
            adder = new Cursor(this, root);
            remover = new Cursor(this, root);
        }

        /// Indicate to the reader that no more data is going to be written.
        public void MarkCompleted()
        {
            completed = true;
        }

        /// Read the next piece of data. Returns false if there is no more data. 
        public bool Read(ref T x)
        {
            return remover.Read(ref x);
        }

        /// Writes more data.
        public void Write(T x)
        {
            adder.Write(x);
        }

        /// Tells us if there are too many nodes, and can't add anymore.
        private bool IsFull()
        {
            return NodeCount == MAX_NODE_COUNT;  
        }
    }
}
+1  A: 

I suspect it is not thread safe - imagine the following scenario:

two threads enter cursor.Write. The first gets as far as line node = new Node(x, node); in the true half of the if (current == BUFFER_SIZE) statement (but let's also assume that current == BUFFER_SIZE) so when 1 gets added to current then another thread coming in would follow the other path through the if statement. Now imagine that thread 1 loses its time slice and thread 2 gets it, and proceeds to enter the if statement on the mistaken belief that the condition still held. It should have entered the other path.

I haven't run this code either, so I'm not sure if my assumptions are possible in this code, but if they are (i.e. entering cursor.Write from multiple threads when current == BUFFER_SIZE), then it may well be prone to concurrency errors.

Andrew Matthews
He did mention this is intended as a single-writer/single-reader queue.
Franci Penov
Yes, perhaps I should have made the point more prominent. This code is definitely not thread-safe for multiple writers or readers.
cdiggins
A: 

Given that I can't find any reference that the Interlocked.Exchange does Read or Write blocks, I would say not. I would also question why you want to go lockless, as seldom gives enough benefits to counter it's complexity.

Microsoft had an excellent presentation at the 2009 GDC on this, and you can get the slides here.

Languard
I don't understand what you mean, Interlocked.Exchange does not Read or Write blocks. Could you also explain your logic as to why it is not thread-safe? As for the issue of complexity and benefits, can you be more specific? I have a very concrete example here that I want to know about. Thanks for your time!
cdiggins
+1  A: 

First, I wonder about the assumption in these two lines of sequential code:

                node.data[current++] = x;

                // We have to use interlocked, to assure that we incremeent the count 
                // atomicalluy, because the reader could be reading it.
                Interlocked.Increment(ref node.count);

What is to say that the new value of node.data[] has been committed to this memory location? It is not stored in a volatile memory address and therefore can be cached if I understand it correctly? Doesn't this potentially lead to a 'dirty' read? There may be other places the same is true, but this one stood out at a glance.

Second, multi-threaded code that contains the following:

Thread.Sleep(int);

... is never a good sign. If it's required then the code is destined to fail, if it isn't required it's a waste. I really wish they would remove this API entirely. Realize that is a request to wait at least that amount of time. With the overhead of context switching your almost certainly going to wait longer, a lot longer.

Third, I completely don't understand the use of the Interlock API here. Maybe I'm tired and just missing the point; but I can't find the potential thread conflict on both threads reading & writing to the same variable? It would seem that the only use I could find for interlock exchange would be to modify the contents of node.data[] to fix #1 above.

Lastly it would seem that the implementation is somewhat over-complicated. Am I missing the point of the whole Cursor/Node thing or is it basically doing the same thing as this class? (Note: I haven't tried it and I don't think this is thread safe either, just trying to boil down what I think your doing.)

class ReaderWriterQueue<T>
{
 readonly AutoResetEvent _readComplete;
 readonly T[] _buffer;
 readonly int _maxBuffer;
 int _readerPos, _writerPos;

 public ReaderWriterQueue(int maxBuffer)
 {
  _readComplete = new AutoResetEvent(true);
  _maxBuffer = maxBuffer;
  _buffer = new T[_maxBuffer];
  _readerPos = _writerPos = 0;
 }

 public int Next(int current) { return ++current == _maxBuffer ? 0 : current; }

 public bool Read(ref T item)
 {
  if (_readerPos != _writerPos)
  {
   item = _buffer[_readerPos];
   _readerPos = Next(_readerPos);
   return true;
  }
  else
   return false;
 }

 public void Write(T item)
 {
  int next = Next(_writerPos);

  while (next == _readerPos)
   _readComplete.WaitOne();

  _buffer[next] = item;
  _writerPos = next;
 }
}

So am I totally off-base here and am failing to see the magic in the original class?

I must admit one thing, I despise Threading. I've seen the best developers fail at it. This article gives a great example on how hard it is to get threading right: http://www.yoda.arachsys.com/csharp/singleton.html

csharptest.net
1. Good point. Looks like a clear fail.2. Here is my rationale for Thread.Sleep(1)a) both threads are going at full tilt with no user interactionb) if the Thread.Sleep(1) is reached, then the producer is faster than the consumer. So just let the OS use the producer's thread to pump more data into the queue. One msec on a 3GHZ machine should be 3 million instructions. Enough to do a context switch and pump a significant amount of data back into the queue. 3. Interlock is used with paranoia. :-)4. See next comment, out of space.
cdiggins
4. My goal with the design of a linked list of buffers was to allow this queue to be small or big as needed. If consumption is fast then only a small buffer is needed, otherwise a much bigger buffer may be needed. My goal is to have a whole chain of these buffers working in tandem. Your comments are awesome. I too really hate threading. My goal is to create some primitive threading tools that can be used to construct parallel algorithms with no headache (e.g. streaming from producers to consumers with no worries about thread problems.
cdiggins
I just realized there was a misleading constant in my implementation: max buffers of 4. This kind of defeated the design purpose described above. Making it 100 is more sensible, and now the array can grow or shrink from 2k to 200k depending on its needs (assuming a 32 bit system).
cdiggins
#1 - you can fix it with Thread.VolatileWrite(). #2 - is not needed as your writer will block and wait anyway with the wait handle. #3 - I'd remove the extraneous calls to Interlocking, it doesn't make sense. Use volatile writes if you want to be paranoid. #4 - Ahh, so that makes more sense now having the buffer scale. I could see wanting to do that if your buffer was .5 MB and you wanted to scale up without hitting the large object heap. Still I think the overall code could be a little simpler by killing off the Cursor.
csharptest.net
RE: "One msec on a 3GHZ machine should be 3 million instructions..." It sounds like you're unaware of the nature of Thread.Sleep. See http://stackoverflow.com/questions/508208/what-is-the-impact-of-thread-sleep1-in-c for more info. Basically, in a default scenario, Thread.Sleep(1) will almost always result in at least the equivalent of Thread.Sleep(15ish).
Chris
+5  A: 

Microsoft Research CHESS should prove to be a good tool for testing your implementation.

GregC
Ahhhh.... http://research.microsoft.com/en-us/projects/chess/This is probably technically the correct answer to my question. Is X written in .NET thread-safe? Run CHESS. Cool!!!
cdiggins
A: 

Beware of the double checked - single lock pattern (as in a link quoted above: http://www.yoda.arachsys.com/csharp/singleton.html)

Quoting verbatim from the "Modern C++ Design" by Andrei Alexandrescu

    Very experienced multithreaded programmers know that even the Double-Checked Locking pattern, although correct on paper, is not always correct in practice. In certain symmetric multiprocessor environments (the ones featuring the so-called relaxed memory model), the writes are committed to the main memory in bursts, rather than one by one. The bursts occur in increasing order of addresses, not in chronological order. Due to this rearranging of writes, the memory as seen by one processor at a time might look as if the operations are not performed in the correct order by another processor. Concretely, the assignment to pInstance_ performed by a processor might occur before the Singleton object has been fully initialized! Thus, sadly, the Double-Checked Locking pattern is known to be defective for such systems
hackworks
The CLI should enforce the memory model of the .NET CLI, regardless that the memory model of the SMP environment is. But yes, knowing the memory model is essential for lock-free programming.
dmeister