views:

4950

answers:

8

I have a scenario where I have multiple threads adding to a queue and multiple threads reading from the same queue. If the queue reaches a specific size all threads that are filling the queue will be blocked on add until an item is removed from the queue.

The solution below is what I am using right now and my question is: How can this be improved? Is there an object that already enables this behavior in the BCL that I should be using?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
A: 

If you want maximum throughput, allowing multiple readers to read and only one writer to write, BCL has something called ReaderWriterLockSlim that should help slim down your code...

DavidN
I want none to be able to write if the queue is full though.
spoon16
So you combine it with a lock. Here are some very good examples http://www.albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle http://www.albahari.com/threading/part4.aspx
DavidN
With queue/dequeue, everyone is a writer... an exclusive lock will perhaps be more pragmatic
Marc Gravell
+36  A: 

That looks very unsafe (very little synchronization); how about something like:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edit)

In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
Marc Gravell
Thanks Marc, great answer. I'll give it a few hours so that if anyone else has any good ideas they have a chance for the accepted. This is great though, thanks!
spoon16
See also edit...
Marc Gravell
Wouldn't you also need to Monitor.PulseAll in case of close?
spoon16
How about changing the wait to a WaitAny and passing in a terminate waithandle on construction ...
Sam Saffron
@sambo99 - for what purpose? At the moment it uses Monitor, which is the cheapest of all the locking constructs, as it doesn't need to go to the OS. A bool and a Monitor is quite a bit cheaper than a waithandle...
Marc Gravell
Yerp, your implementation here is solid, easy to understand and fast. Only advantage with a WaitAny type approach is that you could have a single event that triggers shutdown without needing a Close method ... It may be useful sometimes.
Sam Saffron
another thing I would look at here is probably making size queue IDisposable. Also, what happens if your queue object goes out of scope while threads are blocked. It would probably be a bugger to debug.
Sam Saffron
@Marc- an optimisation, if you were expecting the queue to always reach capacity, would be to pass the maxSize value into the constructor of the Queue<T>. You could add another constructor to your class to accomodate for that.
RichardOD
Why SizeQueue, why not FixedSizeQueue?
user144182
@user260197 - because naming stuff is tricky ;-p Feel free to rename it..
Marc Gravell
If you lock on the queue, won't that effectively prevent other threads from messing with it? For instance, add, first it locks the queue, and then if the queue is full, it then waits for the queue to be signalled, but how can it be signalled if no other thread can lock it first? There must be something I'm not getting here...
Lasse V. Karlsen
@Lasse - it releases the lock(s) during `Wait`, so other threads can acquire it. It reclaims the lock(s) when it wakes up.
Marc Gravell
Nice, as I said, there was something I wasn't getting :) That sure makes me want to revisit some of my thread-code....
Lasse V. Karlsen
What happens when a thread wakes up and the lock is already taken by someone else, it will block waiting for the lock to become free and then claim it?
Lasse V. Karlsen
Actually I can think of many questions around this subject, is there a link to some material I can read? If not, I'll try to summarize up some questions in a post of my own here.
Lasse V. Karlsen
@Lasse - when it is pulsed it moves from the sleeping-queue to the ready-queue (as though it was just now trying to take the lock). When the thread that pulsed releases the lock, the *next* thread in the ready-queue (which might be neither of those) gets a go. And so on. MSDN may help, but... Jon's pages have some coverage too.
Marc Gravell
@Lasse - note the drop down at the bottom for the 17 topics: http://www.yoda.arachsys.com/csharp/threads/
Marc Gravell
Would not Pulse instead of PulseAll work fine in Enqueue/Dequeue?
@notso - possibly, but then you largely have to *know* which thread is waiting. And in the case of pulling the queue down you actively *want* to wake up all the worker. In most cases there is only one worker, making it a moot point. In the more complex cases, you've got enough to worry about, without stressing over this - you could *carefully* add it, I suppose.
Marc Gravell
@notso In answer to my own question: no, Pulse is not enough since it wakes up a single thread and we do not know whether this thread is in the Enqueue or Dequeue method.
In case anyone cares I chose to not have a maximum queue size. Thus the enqueue call never blocks meaning dequeue does not need to call pulse at all and enqueue can use Pulse instead of PulseAll since only dequeue calls are waiting. Note that when closing the queue PulseAll is of course necessary. Also thank you Marc for the quick answer.
A: 

Well, you might look at System.Threading.Semaphore class. Other than that - no, you have to make this yourself. AFAIK there is no such built-in collection.

Vilx-
I looked at that for throttling the number of threads that are accessing a resource but it doesn't allow you to block all access to a resource based on some condition (like Collection.Count). AFAIK anyway
spoon16
Well, you do that part yoursef, just like you do now. Simply instead of MaxSize and _FullEvent you have the Semaphore, which you initialize with the right count in the constructor. Then, upon every Add/Remove you call WaitForOne() or Release().
Vilx-
It's not much different than what you have now. Just more simple IMHO.
Vilx-
Can you give me an example showing this working? I didn't see how to adjust the size of a Semaphor dynamically which this scenario requires. Since you have to be able to block all resources only if the queue is full.
spoon16
Ahh, changing size! Why didn't you say so immediately? OK, then a semaphore is not for you. Good luck with this approach!
Vilx-
+1  A: 

I haven't fully explored the TPL but they might have something that fits your needs, or at the very least, some Reflector fodder to snag some inspiration from.

Hope that helps.

TheMissingLINQ
Thanks for the link, I'll take a closer look at the TPL.
spoon16
+2  A: 

"How can this be improved?"

Well, you need to look at every method in your class and consider what would happen if another thread was simultaneously calling that method or any other method. For example, you put a lock in the Remove method, but not in the Add method. What happens if one thread Adds at the same time as another thread Removes? Bad things.

Also consider that a method can return a second object that provides access to the first object's internal data - for example, GetEnumerator. Imagine one thread is going through that enumerator, another thread is modifying the list at the same time. Not good.

A good rule of thumb is to make this simpler to get right by cutting down the number of methods in the class to the absolute minimum.

In particular, don't inherit another container class, because you will expose all of that class's methods, providing a way for the caller to corrupt the internal data, or to see partially complete changes to the data (just as bad, because the data appears corrupted at that moment). Hide all the details and be completely ruthless about how you allow access to them.

I'd strongly advise you to use off-the-shelf solutions - get a book about threading or use 3rd party library. Otherwise, given what you're attempting, you're going to be debugging your code for a long time.

Also, wouldn't it make more sense for Remove to return an item (say, the one that was added first, as it's a queue), rather than the caller choosing a specific item? And when the queue is empty, perhaps Remove should also block.

Update: Marc's answer actually implements all these suggestions! :) But I'll leave this here as it may be helpful to understand why his version is such an improvement.

Daniel Earwicker
I appreciate the detailed explanations. Thanks!
spoon16
A: 

I started similar question as didn't find yours. This could help: http://www.eggheadcafe.com/articles/20060414.asp (very similar to Marc Gravell's answer)

Shrike
+2  A: 

This is what I came op for a thread safe bounded blocking queue.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
Kevin
+1  A: 

I just knocked this up using the Reactive Extensions and remembered this question:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Not necessarily entirely safe, but very simple.

Coder 42