views:

86

answers:

3

Hi guys,

I have a thread, which creates a variable number of worker threads and distributes tasks between them. This is solved by passing the threads a TaskQueue object, whose implementation you will see below.

These worker threads simply iterate over the TaskQueue object they were given, executing each task.

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enqueue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

As you can see, I'm using an AutoResetEvent object to make sure that the worker threads don't exit prematurely, i.e. before getting any tasks.

In a nutshell:

  • the main thread assigns a task to a thread by Enqeueue-ing a task to its TaskQueue
  • the main thread notifies the thread that are no more tasks to execute by calling the TaskQueue's Finish() method
  • the worker thread retrieves the next task assigned to it by calling the TaskQueue's Dequeue() method

The problem is that the Dequeue() method often throws an InvalidOperationException, saying that the Queue is empty. As you can see I added some logging, and it turns out, that the AutoResetEvent doesn't block the Dequeue(), even though there were no calls to its Set() method.

As I understand it, calling AutoResetEvent.Set() will allow a waiting thread to proceed (who previously called AutoResetEvent.WaitOne()), and then automatically calls AutoResetEvent.Reset(), blocking the next waiter.

So what can be wrong? Did I get something wrong? Do I have an error somewhere? I'm sitting above this for 3 hours now, but I cannot figure out what's wrong. Please help me!

Thank you very much!

+6  A: 

Your dequeue code is incorrect. You check the Count under lock, then fly by the seams of your pants, and then you expect the tasks to have something. You cannot retain assumptions while you release the lock :). Your Count check and tasks.Dequeue must occur under lock:

bool TryDequeue(out Tasks task)
{
  task = null;
  lock (this.tasks) {
    if (0 < tasks.Count) {
      task = tasks.Dequeue();
    }
  }
  if (null == task) {
    Log.Trace ("Queue was empty");
  }
  return null != task;
 }

You Enqueue() code is similarly riddled with problems. Your Enqueue/Dequeue don't ensure progress (you will have dequeue threads blocked waiting even though there are items in the queue). Your signature of Enqueue() is wrong. Overall your post is very very poor code. Frankly, I think you're trying to chew more than you can bite here... Oh, and never log under lock.

I strongly suggest you just use ConcurrentQueue.

If you don't have access to .Net 4.0 here is an implementation to get you started:

public class ConcurrentQueue<T>:IEnumerable<T>
{
    volatile bool fFinished = false;
    ManualResetEvent eventAdded = new ManualResetEvent(false);
    private Queue<T> queue = new Queue<T>();
    private object syncRoot = new object();

    public void SetFinished()
    {
        lock (syncRoot)
        {
            fFinished = true;
            eventAdded.Set();
        }
    }

    public void Enqueue(T t)
    {
        Debug.Assert (false == fFinished);
        lock (syncRoot)
        {
            queue.Enqueue(t);
            eventAdded.Set();
        }
    }

    private bool Dequeue(out T t)
    {
        do
        {
            lock (syncRoot)
            {
                if (0 < queue.Count)
                {
                    t = queue.Dequeue();
                    return true;
                }
                if (false == fFinished)
                {
                    eventAdded.Reset ();
                }
            }
            if (false == fFinished)
            {
                eventAdded.WaitOne();
            }
            else
            {
                break;
            }
        } while (true);
        t = default(T);
        return false;
    }


    public IEnumerator<T> GetEnumerator()
    {
        T t;
        while (Dequeue(out t))
        {
            yield return t;
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
Remus Rusanu
Damn, that's pretty harsh.
Dan Tao
I'm using .NET 3.5, so ConcurrentQueue is out of question.I really appreciate your input, but I'm afraid I don't understand why you're saying this is very poor code. Maybe you could elaborate on this, and add some correct code?As for your TryDequeue() suggestion - it's completely against the purpose of this class. Calling the Deqeueue() method ALWAYS has to return something; if the process is to be finished, an empty Task object. If there are no tasks in the queue, it should wait until there is one.Please update your post.
ShdNx
"Never log under lock". Not even trace? Much too broad brush a statement. How do you debug code in your logging framework? By telepathy?
Steve Townsend
@Dan Tao: I know is harsh, and is intentional.
Remus Rusanu
@Steve Twonsend: yes, never log nor trace under lock. You'll very likely to introduce deadlocks because of loops completed in the logging infrastructure (file, db etc). SO itself found itself in that place early on http://www.codinghorror.com/blog/2008/08/deadlocked.html
Remus Rusanu
I highly recommend going over something like http://www.amazon.com/Art-Multiprocessor-Programming-Maurice-Herlihy/dp/0123705916 before attempting to put this kind of code in production.
Remus Rusanu
@Remus: I am assuming you meant to say `BlockingCollection` instead of `ConcurrrentCollection`? Microsoft's implementation of `ConcurrentCollection` does not block.
Brian Gideon
Thank you very much Remus, your code is working.Based on your code I'm beginning to see where mine failed. I still have a lot to learn in terms of concurrent programming.Thanks again!
ShdNx
@Remi same as ShdNx, you just saved me 2 days
remi bourgarel
+1  A: 

It looks like you are trying to replicate a blocking queue. One already exists in the .NET 4.0 BCL as a BlockingCollection. If .NET 4.0 is not an option for you then you can use this code. It use the Monitor.Wait and Monitor.Pulse method instead of AutoResetEvent.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take() // Dequeue
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public void Add(T data) // Enqueue
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(data);
            Monitor.Pulse(m_Queue);
        }
    }
}

Update:

I am fairly certain that it is not possible to implement a producer-consumer queue using AutoResetEvent if you want it to be thread-safe for multiple producers and multiple consumers (I am prepared to be proven wrong if someone can come up with a counter example). Sure, you will see examples on the internet, but they are all wrong. In fact, one such attempt by Microsoft is flawed in that the queue can get live-locked.

Brian Gideon
Thank you very much for your suggestion, I'll look into Monitor.Wait and Monitor.Pulse and get back to you. Looks promising though!Also thanks for suggesting the BlockingCollection - I'm using .NET 3.5, but it looks interesting.
ShdNx
@ShdNx: You will have to write own then. I would stick with the canonical implementations though as the code can be very tricky to get right if writing it from scatch. There is a solution using a `Semaphore`, but I do not think one exists for an `AutoResetEvent`. The reason is because an ARE does not count and cannot be used to wait from inside a lock (safely anyway). Personally, I would stick with the exlusive `Monitor` solution as that is the most well known.
Brian Gideon
@ShdNx: Nevermind, Dan just pointed out that a backport exists as part of the Rx extension library. I would go with that if you are not comfortable using the code I posted here. Plus, you will get more than just the trivial `Add` and `Take` methods.
Brian Gideon
+3  A: 

A more detailed answer from me is pending, but I just want to point out something very important.

If you're using .NET 3.5, you can use the ConcurrentQueue<T> class. A backport is included in the Rx extensions library, which is available for .NET 3.5.

Since you want blocking behavior, you would need to wrap a ConcurrentQueue<T> in a BlockingCollection<T> (also available as part of Rx).

Dan Tao
+1, BlockingCollection is the way to go. I'm pretty sure most developers have implemented their own blocking queue for use as a way to buffer tasks, messages, what have you. I'm also pretty sure that most of those developers (including myself) got it wrong the first few times. Threading is _hard_ and probably used far too often.
Dan Bryant
@Dan Bryant: Ha, I would even go one step further. Most if not all of those developers (and I am *also* among them) have gotten it wrong *every* time, *including* their final attempts, when they believed they'd figured it all out and patted themselves on the back for a job well done. Most of those implementations have bugs which may lie dormant for a long time, much like the [binary search implementation that was considered proven correct for decades before it was revealed to contain a bug](http://googleresearch.blogspot.com/2006/06/extra-extra-read-all-about-it-nearly.html).
Dan Tao
@Dan Tao, in that case, let's hope that MS got it right :) I have a feeling we're going to see more of these problems start popping up as 4+ core processors and aggressive caching optimizations become more common-place. It'll be interesting to see whether tools like CHESS become mainstream: http://msdn.microsoft.com/en-us/devlabs/cc950526.aspx
Dan Bryant
@Dan Bryant: Quite possibly. I'm actually pretty convinced that our entire approach to concurrent programming is going to undergo a paradigm shift of some sort, simply because of how complex these problems end up being and how difficult they are to solve by mere mortals using our current methods. I wonder if there is a fundamentally different strategy. I don't know what it would be, of course. If I did, I'd probably be giving speeches at conferences about it.
Dan Tao
@Dan: Absolutely. I posted one example in my answer where the Microsoft documenation has a bad implemenation of a blocking queue. I have seen the magazine article from Joe Duffy http://msdn.microsoft.com/en-us/magazine/cc163427.aspx#S4 which has a blocking queue that acts more like the `Barrier` class. The fact is experts get it wrong all of the time. I have learned to be very suspicious anymore.
Brian Gideon
Thank you very much Dan, I didn't know a backport existed. In this situation Remus' answer is the best, though I'd still be most grateful if you posted a more detailed answer elaborating on the problem(s) my code (and generally, my approach) had.
ShdNx
@Brian: Exactly. Even when using the `ConcurrentQueue<T>`, I remain cautious, though fairly relaxed since at least I have the confidence that it's been rigorously tested (more than can be said for many developers' homegrown implementations, to be sure). But crack that puppy open in Reflector and you find that it's ridiculously complex. Working your way through code that deep always brings with it a certain degree of uncertainty. Of course I'm quite sure that whoever wrote the `ConcurrentQueue<T>` class knew what he/she was doing *far* better than *I* would, at least.
Dan Tao
@ShdNx: I do intend to update this answer sometime in the near future. In the meantime I'm glad Remus's answer seems to have worked well for you.
Dan Tao
By the way, to be fair to Joe Duffy I should clarify my comment about his implementation. He even talks about his implemenation not being of the canonical form. I am quite sure it does exactly what he wanted it to do. In my opinion (and I am no expert by any means) he should not have called it a `BlockingQueue` at all because it really does not behave strictly like a queue and we mere mortals may naively attempt to use it as such. Because of the way the producers and consumers must rendezous it seems to behave more like a `Barrier` though that is not a completely accurate comparison.
Brian Gideon