views:

490

answers:

2

I have a class running the Producer-Consumer model like this:

public class SyncEvents
    {
        public bool waiting;

        public SyncEvents()
        {
            waiting = true;
        }
    }

    public class Producer
    {
        private readonly Queue<Delegate> _queue;
        private SyncEvents _sync;
        private Object _waitAck;

        public Producer(Queue<Delegate> q, SyncEvents sync, Object obj)
        {
            _queue = q;
            _sync = sync;
            _waitAck = obj;
        }

        public void ThreadRun()
        {
            lock (_sync)
            {
                while (true)
                {
                    Monitor.Wait(_sync, 0);
                    if (_queue.Count > 0)
                    {
                        _sync.waiting = false;
                    }
                    else
                    {
                        _sync.waiting = true;
                        lock (_waitAck)
                        {
                            Monitor.Pulse(_waitAck);
                        }
                    }
                    Monitor.Pulse(_sync);
                }
            }
        }

    }

    public class Consumer
    {
        private readonly Queue<Delegate> _queue;
        private SyncEvents _sync;

        private int count = 0;

        public Consumer(Queue<Delegate> q, SyncEvents sync)
        {
            _queue = q;
            _sync = sync;
        }

        public void ThreadRun()
        {
            lock (_sync)
            {
                while (true)
                {
                    while (_queue.Count == 0)
                    {
                        Monitor.Wait(_sync);
                    }

                    Delegate query = _queue.Dequeue();
                    query.DynamicInvoke(null);

                    count++;

                    Monitor.Pulse(_sync);
                }
            }
        }
    }

    /// <summary>
    /// Act as a consumer to the queries produced by the DataGridViewCustomCell
    /// </summary>
    public class QueryThread
    {
        private SyncEvents _syncEvents = new SyncEvents();
        private Object waitAck = new Object();
        private Queue<Delegate> _queryQueue = new Queue<Delegate>();

        Producer queryProducer;
        Consumer queryConsumer;

        public QueryThread()
        {
            queryProducer = new Producer(_queryQueue, _syncEvents, waitAck);
            queryConsumer = new Consumer(_queryQueue, _syncEvents);

            Thread producerThread = new Thread(queryProducer.ThreadRun);
            Thread consumerThread = new Thread(queryConsumer.ThreadRun);

            producerThread.IsBackground = true;
            consumerThread.IsBackground = true;

            producerThread.Start();
            consumerThread.Start();
        }

        public bool isQueueEmpty()
        {
            return _syncEvents.waiting;
        }

        public void wait()
        {
            lock (waitAck)
            {
                while (_queryQueue.Count > 0)
                {
                    Monitor.Wait(waitAck);
                }
            }
        }

        public void Enqueue(Delegate item)
        {
            _queryQueue.Enqueue(item);
        }
    }

The code run smoothly but the wait() function. In some case I want to wait until all the function in the queue were finished running so I made the wait() function.

The producer will fire the waitAck pulse at suitable time.

However, when the line "Monitor.Wait(waitAck);" is ran in the wait() function, all thread stop, includeing the producer and consumer thread.

Why would this happen and how can I solve it? thanks!

+2  A: 

It seems very unlikely that all the threads will actually stop, although I should point out that to avoid false wake-ups you should probably have a while loop instead of an if statement:

lock (waitAck)
{
    while(queryProducer.secondQueue.Count > 0)
    {
        Monitor.Wait(waitAck);
    }
}

The fact that you're calling Monitor.Wait means that waitAck should be released so it shouldn't prevent the consumer threads from locking...

Could you give more information about the way in which the producer/consumer threads are "stopping"? Does it look like they've just deadlocked?

Is your producer using Notify or NotifyAll? You've got an extra waiting thread now, so if you only use Notify it's only going to release a single thread... it's hard to see whether or not that's a problem without the details of your Producer and Consumer classes.

If you could show a short but complete program to demonstrate the problem, that would help.

EDIT: Okay, now you've posted the code I can see a number of issues:

  • Having so many public variables is a recipe for disaster. Your classes should encapsulate their functionality so that other code doesn't have to go poking around for implementation bits and pieces. (For example, your calling code here really shouldn't have access to the queue.)

  • You're adding items directly to the second queue, which means you can't efficiently wake up the producer to add them to the first queue. Why do you even have multiple queues?

  • You're always waiting on _sync in the producer thread... why? What's going to notify it to start with? Generally speaking the producer thread shouldn't have to wait, unless you have a bounded buffer

  • You have a static variable (_waitAck) which is being overwritten every time you create a new instance. That's a bad idea.

You also haven't shown your SyncEvents class - is that meant to be doing anything interesting?

To be honest, it seems like you've got quite a strange design - you may well be best starting again from scratch. Try to encapsulate the whole producer/consumer queue in a single class, which has Produce and Consume methods, as well as WaitForEmpty (or something like that). I think you'll find the synchronization logic a lot easier that way.

Jon Skeet
I have editted the code now. thanks=]
mr.LiKaShing
sorry for the messy code. In fact the original version is not like this, I messed it up after falled into the deadlock.I tidy up the code now and hope you could take a look in it since the problem is still there.
mr.LiKaShing
And in the producer class it shouldn't be waiting since the Monitor.Wait(_sync, 0) specified a 0 timeout parameter. It is for release the lock for the consumer class.
mr.LiKaShing
@mr.LiKaShing: That's only going to release the lock very, very briefly. I don't *think* there's any guarantee that the consumer class will grab the lock if it's waiting for it. It's really not the way to go. Again, all this should be encapsulated in *one* class to start with, and the threads should only be waiting when they actually need to.
Jon Skeet
+1  A: 

Here is my take on your code:

public class ProducerConsumer
{
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;
    private static Object _sync = new Object();

    public ProducerConsumer(Queue<Delegate> queue)
    {
        lock (_sync)
        {
            // Note: I would recommend that you don't even
            // bother with taking in a queue.  You should be able
            // to just instantiate a new Queue<Delegate>()
            // and use it when you Enqueue.  There is nothing that
            // you really need to pass into the constructor.
            _queue = queue;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public override void Enqueue(Delegate value)
    {
        lock (_sync)
        {
            _queue.Enqueue(value);
            _ready.Set();
        }
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        Delegate query;
        try
        {
            while (true)
            {
                _ready.WaitOne();
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _ready.Reset();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }


    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
            }
        }
        base.Dispose(disposing);
    }


}

I'm not exactly sure what you're trying to achieve with the wait function... I'm assuming you're trying to put some type of a limit to the number of items that can be queued. In that case simply throw an exception or return a failure signal when you have too many items in the queue, the client that is calling Enqueue will keep retrying until the queue can take more items. Taking an optimistic approach will save you a LOT of headaches and it simply helps you get rid of a lot of complex logic.

If you REALLY want to have the wait in there, then I can probably help you figure out a better approach. Let me know what are you trying to achieve with the wait and I'll help you out.

Note: I took this code from one of my projects, modified it a little and posted it here... there might be some minor syntax errors, but the logic should be correct.

UPDATE: Based on your comments I made some modifications: I added another ManualResetEvent to the class, so when you call BlockQueue() it gives you an event which you can wait on and sets a flag to stop the Enqueue function from queuing more elements. Once all the queries in the queue are serviced, the flag is set to true and the _wait event is set so whoever is waiting on it gets the signal.

public class ProducerConsumer
{
    private bool _canEnqueue;
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;

    private static Object _sync = new Object();
    private static ManualResetEvent _wait = new ManualResetEvent(false);

    public ProducerConsumer()
    {
        lock (_sync)
        {
            _queue = new Queue<Delegate> _queue;
            _canEnqueue = true;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public bool Enqueue(Delegate value)
    {
        lock (_sync)
        {
            // Don't allow anybody to enqueue
            if( _canEnqueue )
            {
                _queue.Enqueue(value);
                _ready.Set();
                return true;
            }
        }
        // Whoever is calling Enqueue should try again later.
        return false;
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        try
        {
            while (true)
            {
                // Wait for a query to be enqueued
                _ready.WaitOne();

                // Process the query
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        Delegate query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _canEnqueue = true;
                        _ready.Reset();
                        _wait.Set();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }

    // Block your queue from enqueuing, return null
    // if the queue is already empty.
    public ManualResetEvent BlockQueue()
    {
        lock(_sync)
        {
            if( _queue.Count > 0 )
            {
                _canEnqueue = false;
                _wait.Reset();
            }
            else
            {
                // You need to tell the caller that they can't
                // block your queue while it's empty. The caller
                // should check if the result is null before calling
                // WaitOne().
                return null;
            }
        }
        return _wait;
    }

    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
                // Set wait when you're disposing the queue
                // so that nobody is left with a lingering wait.
                _wait.Set();
            }
        }
        base.Dispose(disposing);
    }
}
Lirik
Thanks!It is a more tidy implementation.My situation is: a main thread(UI) running, a background thread querying data from database.At a scenario the UI want to export the data, however, the background thread is still working to retrieve data, I want to wait until the background thread finished all its work, so I need the wait function.I tried to improve my code according to the advices of Jon, and I found it works.....Only if the function pointed by the delegate do not interfere with the component lies in the main thread....maybe because the main thread is blocked by the Monitor.Wait()
mr.LiKaShing
And I am trying another approach to solve the problem
mr.LiKaShing
@mr.LiKaShing I'm assuming you have a way to keep the UI thread responsive while you're making it wait for the queries to complete. Also be aware of the tricky situation here: you're not stopping the Enqueue function from queuing more queries, so your wait might be extended quite a bit.
Lirik