views:

111

answers:

4

Hi

I have two threads, one thread processes a queue and the other thread adds stuff into the queue.

  1. I want to put the queue processing thread to sleep when its finished processing the queue
  2. I want to have the 2nd thread tell it to wake up when it has added an item to the queue

However these functions call System.Threading.SynchronizationLockException: Object synchronization method was called from an unsynchronized block of code on the Monitor.PulseAll(waiting); call, because I havent syncronized the function with the waiting object. [which I dont want to do, i want to be able to process while adding items to the queue]. How can I achieve this?

Queue<object> items = new Queue<object>();
object waiting = new object();

1st Thread

public void ProcessQueue()
{
 while (true)
 {
   if (items.Count == 0)
     Monitor.Wait(waiting);

    object real = null;
    lock(items) {
    object item = items.Dequeue();
    real = item;
    }
    if(real == null)
        continue;
    .. bla bla bla
 } 
}

2nd Thread involves

public void AddItem(object o)
{
 ... bla bla bla
 lock(items)
 {
 items.Enqueue(o);
 }
 Monitor.PulseAll(waiting);
}
+1  A: 

Use Semaphore http://msdn.microsoft.com/library/system.threading.semaphore.aspx it was designed exactly for this

Andrey
oh right. Ok. Thanks :D I learnt monitors in my concurrent class and it suggested that monitors where used prodomentientl. Learnt with Java, which has monitors implemented without the requirement for having actually 'monitoring' the object already.
Kurru
Semaphores would do the trick, but they're heavyweight for the task. They require a thunk into kernel mode, which'll cost you.
Kennet Belenky
A: 

I prefer to use a callback that launches a processing thread that continues until it's caught up, with locks causing simultaneous readers and writers to wait in line:

public delegate void CallbackDelegate();

class Program
{
    static void Main(string[] args)
    {
        Queue<object> items = new Queue<object>();

        Processor processor = new Processor(items);
        Adder adder = new Adder(items, new CallbackDelegate(processor.CallBack));

        Thread addThread = new Thread(new ParameterizedThreadStart(adder.AddItem));
        object objectToAdd = new object();
        addThread.Start(objectToAdd);
    }
}

class Processor
{
    Queue<object> items;

    public Processor(Queue<object> itemsArg)
    {
        items = itemsArg;
    }

    public void ProcessQueue()
    {
        lock (items)
        {
            while (items.Count > 0)
            {
                object real = items.Dequeue();
                // process real
            }
        }
    }

    public void CallBack()
    {
        Thread processThread = new Thread(ProcessQueue);
        processThread.IsBackground = true;
        processThread.Start();
    }
}

class Adder
{
    Queue<object> items;
    CallbackDelegate callback;

    public Adder(Queue<object> itemsArg, CallbackDelegate callbackArg)
    {
        items = itemsArg;
        callback = callbackArg;
    }

    public void AddItem(object o)
    {
        lock (items) { items.Enqueue(o); }
        callback();
    }
}
ebpower
+1  A: 

If you have access to .NET 4.0, what you want to do can be achieved by BlockingCollection&lt;T&gt;.
If you want to do it yourself by means of the Monitor class and signaling with Pulse(), you are actually on the right track.
You get the exception because to call Wait(), Pulse() and PulseAll(), you have to own the lock on the specified object. You happen to miss this on waiting.

A sample basic thread-safe queue that can be used:

  • with foreach on the consumer,
  • with while or your favorite conditional construct on the producer side,
  • handles multiple producers/consumers and
  • uses lock(), Monitor.Pulse(), Monitor.PulseAll() and Monitor.Wait():

.

public class SignaledQueue<T>
{
    Queue<T> queue = new Queue<T>();
    volatile bool shutDown = false;

    public bool Enqueue(T item)
    {
        if (!shutDown)
        {
            lock (queue)
            {
                queue.Enqueue(item);
                //Pulse only if there can be waiters.
                if (queue.Count == 1)
                {
                    Monitor.PulseAll(queue);
                }
            }
            return true;
        }
        //Indicate that processing should stop.
        return false;
    }

    public IEnumerable<T> DequeueAll()
    {
        while (!shutDown)
        {
            do
            {
                T item;
                lock (queue)
                {
                    //If the queue is empty, wait.
                    if (queue.Count == 0)
                    {
                        if (shutDown) break;
                        Monitor.Wait(queue);
                        if (queue.Count == 0) break;
                    }
                    item = queue.Dequeue();
                }
                yield return item;
            } while (!shutDown);
        }
    }

    public void SignalShutDown()
    {
        shutDown = true;
        lock (queue)
        {
            //Signal all waiting consumers with PulseAll().
            Monitor.PulseAll(queue);
        }
    }

}

Sample usage:

class Program
{
    static void Main(string[] args)
    {
        int numProducers = 4, numConsumers = 2;
        SignaledQueue<int> queue = new SignaledQueue<int>();

        ParameterizedThreadStart produce = delegate(object obj)
        {
            Random rng = new Random((int)obj);
            int num = 0;
            while (queue.Enqueue(++num))
            {
                Thread.Sleep(rng.Next(100));
            } 
        };

        ThreadStart consume = delegate
        {
            foreach (int num in queue.DequeueAll())
            {
                Console.Write(" {0}", num);
            }
        };

        Random seedRng = new Random();
        for (int i = 0; i < numProducers; i++)
        {
            new Thread(produce).Start(seedRng.Next());
        }

        for (int i = 0; i < numConsumers; i++)
        {
            new Thread(consume).Start();
        }

        Console.ReadKey(true);
        queue.SignalShutDown();

    }
}
andras
+1  A: 

The answer is in the error message you posted: "Object synchronization method was called from an unsynchronized block of code on the Monitor.PulseAll(waiting);"

You have to call Monitor.PulseAll(waiting) from inside the lock(waiting) block.

Also... you have to call Monitor.Wait from within a lock block as well.

Kennet Belenky