views:

230

answers:

4

I'm starting with the C# code example here. I'm trying to adapt it for a couple reasons: 1) in my scenario, all tasks will be put in the queue up-front before consumers will start, and 2) I wanted to abstract the worker into a separate class instead of having raw Thread members within the WorkerQueue class.

My queue doesn't seem to dispose of itself though, it just hangs, and when I break in Visual Studio it's stuck on the _th.Join() line for WorkerThread #1. Also, is there a better way to organize this? Something about exposing the WaitOne() and Join() methods seems wrong, but I couldn't think of an appropriate way to let the WorkerThread interact with the queue.

Also, an aside - if I call q.Start(#) at the top of the using block, only some of the threads every kick in (e.g. threads 1, 2, and 8 process every task). Why is this? Is it a race condition of some sort, or am I doing something wrong?

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;

namespace QueueTest
{
    class Program
    {
        static void Main(string[] args)
        {
            using (WorkQueue q = new WorkQueue())
            {
                q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });

                Random r = new Random();
                foreach (int i in Enumerable.Range(1, 10))
                    q.Enqueue(r.Next(100, 500));

                Console.WriteLine("All jobs queued");
                q.Start(8);
            }
        }
    }

    class WorkQueue : IDisposable
    {
        private Queue<int> _jobs = new Queue<int>();
        private int _job_count;
        private EventWaitHandle _wh = new AutoResetEvent(false);
        private object _lock = new object();
        private List<WorkerThread> _th;
        public event Action Finished;

        public WorkQueue()
        {
        }

        public void Start(int num_threads)
        {
            _job_count = _jobs.Count;
            _th = new List<WorkerThread>(num_threads);
            foreach (int i in Enumerable.Range(1, num_threads))
            {
                _th.Add(new WorkerThread(i, this));
                _th[_th.Count - 1].JobFinished += new Action<int>(WorkQueue_JobFinished);
            }
        }

        void WorkQueue_JobFinished(int obj)
        {
            lock (_lock)
            {
                _job_count--;
                if (_job_count == 0 && Finished != null)
                    Finished();
            }
        }

        public void Enqueue(int job)
        {
            lock (_lock)
                _jobs.Enqueue(job);

            _wh.Set();
        }

        public void Dispose()
        {
            Enqueue(Int32.MinValue);
            _th.ForEach(th => th.Join());
            _wh.Close();
        }

        public int GetNextJob()
        {
            lock (_lock)
            {
                if (_jobs.Count > 0)
                    return _jobs.Dequeue();
                else
                    return Int32.MinValue;
            }
        }

        public void WaitOne()
        {
            _wh.WaitOne();
        }
    }

    class WorkerThread
    {
        private Thread _th;
        private WorkQueue _q;
        private int _i;

        public event Action<int> JobFinished;

        public WorkerThread(int i, WorkQueue q)
        {
            _i = i;
            _q = q;
            _th = new Thread(DoWork);
            _th.Start();
        }

        public void Join()
        {
            _th.Join();
        }

        private void DoWork()
        {
            while (true)
            {
                int job = _q.GetNextJob();
                if (job != Int32.MinValue)
                {
                    Console.WriteLine("Thread {0} Got job {1}", _i, job);
                    Thread.Sleep(job * 10); // in reality would to actual work here
                    if (JobFinished != null)
                        JobFinished(job);
                }
                else
                {
                    Console.WriteLine("Thread {0} no job available", _i);
                    _q.WaitOne();
                }
            }
        }
    }
}
+5  A: 

The worker threads are all blocking on the _q.WaitOne() call in DoWork(). Calling the thread's Join() method will deadlock, the threads never exit. You'll need to add a mechanism to signal to worker thread to exit. A ManualResetEvent, tested with WaitAny in the worker, will get the job done.

One debugging tip: get familiar with the Debug + Windows + Threads window. It lets you switch between threads and look at their call stacks. You'd have quickly found this problem by yourself.

Hans Passant
+1  A: 

You do a WaitOne() at the end of DoWork but you never set it after the threads start running.
Note that AutoResetEvent will go back to not set state after a 'successful' WaitOne

Itay
+1  A: 

Your loop in your DoWork method never finishes. This will cause the thread to always be busy and this thread.Join() will block forever, waiting for it to complete.

You have a WaitOne, but I don't think it's necessary unless there is a reason you want your threadpool to stick around after your work is complete:

    private void DoWork()
    {
        bool done = false;
        while (!done)
        {
            int job = _q.GetNextJob();
            if (job != Int32.MinValue)
            {
                Console.WriteLine("Thread {0} Got job {1}", _i, job);
                Thread.Sleep(job * 10); // in reality would to actual work here
                if (JobFinished != null)
                    JobFinished(job);
            }
            else
            {
                Console.WriteLine("Thread {0} no job available", _i);
                done = true;
            }
        }
    }

If you want the threads to stick around so you don't have to realloc more threads when WorkQueue.Start is called, you'd have to do something more elaborate with the AutoResetEvent.

Anderson Imes
+1  A: 

Your main problem is the deterministic deadlock described in the other answers.

The correct way to handle it, though, is not to fix the deadlock, but to eliminate the Event altogether.

The whole idea of the Producer-Consumer model is that the clients En-queue and De-queue elements concurrently, and that's why sync mechanisms are required. If you're enqueuing all of the elements beforehand and then only dequeue concurrently, you only need a lock on the dequeue, since the "Event" is used to let "Consumers" wait for new elements to be enqueued; this will not happen in your case (based on your description).

Also, the "single responsibility" design principle suggests that the threading code should be separated from the "Blocking Queue" code. Make the "Blocking Queue" a class of its own, then use it in your thread-management class.

Hershi
so - at least for my scenario - I don't even need the EventWaitHandle, or the thread Join() calls, right?
toasteroven