views:

141

answers:

3

I'm trying to use the producer consumer pattern to process and save some data. I'm using AutoResetEvent for signalling between the two therads here is the code I have

Here is the producer function

 public Results[] Evaluate()
    {
        processingComplete = false;
        resultQueue.Clear();
        for (int i = 0; i < data.Length; ++i)
            {
                if (saveThread.ThreadState == ThreadState.Unstarted)
                    saveThread.Start();
               //-....
               //Process data 
               // 
                lock (lockobject)
                {
                    resultQueue.Enqueue(result);
                }

                signal.Set();
            }
            processingComplete = true;
        }

And here is the consumer function

   private void SaveResults()
    {
        Model dataAccess = new Model();

        while (!processingComplete || resultQueue.Count > 0)
        {
            if (resultQueue.Count == 0)
                signal.WaitOne();
            ModelResults result;
            lock (lockobject)

            {
                result = resultQueue.Dequeue();
            }
            dataAccess.Save(result);
        }
        SaveCompleteSignal.Set();
    }

So my issue is sometimes resultQueue.Dequeue() throws InvalidOperation exception because the Queue is empty. I'm not sure what I'm doing wrong shouldn't the signal.WaitOne() above that block the the queue is empty?

+1  A: 

You check the Queue's Count outside of a synchronized context. Since the Queue is not threadsafe, this could be a problem (possibly while Enqueue is in process Count return 1 but no item can be dequeued), and it would go seriously wrong if you were to use more than one consumer anyways.

You may want to read the threading articles written by Joseph Albahari, he has also a good sample for your problem as well as a "better" solution without OS synchronization objects.

Lucero
+2  A: 

You have synchronization issues due to a lack of proper locking.

You should lock all of the queue access, including the count check.

In addition, using Thread.ThreadState in this manner is a "bad idea". From the MSDN docs for ThreadState:

"Thread state is only of interest in debugging scenarios. Your code should never use thread state to synchronize the activities of threads."

You can't rely on this as a means of handling synchronization. You should redesign to make sure the thread will be started before it's used. If it's not started, just don't initialize it. (You can always use a null check - if the thread's null, create it and start it).

Reed Copsey
A: 

You have to put lock() around all references to the queue. You also have some issues around identifying processing complete (at the end of the queue you'll get a signal but the queue will be empty).

public Results[] Evaluate()
{
    processingComplete = false;
 lock(lockobject) 
 {
        resultQueue.Clear();
 }
    for (int i = 0; i < data.Length; ++i)
    {
        if (saveThread.ThreadState == ThreadState.Unstarted)
            saveThread.Start();
       //-....
       //Process data 
       // 
        lock (lockobject)
        {
            resultQueue.Enqueue(result);
        }

        signal.Set();
    }
    processingComplete = true;
}

private void SaveResults()
{
    Model dataAccess = new Model();

 while (true)
    {
  int count;

  lock(lockobject)  
  {
   count = resultQueue.Count;
  }
        if (count == 0)
            signal.WaitOne();

  lock(lockobject)  
  {
   count = resultQueue.Count;
  }
  // we got a signal, but queue is empty, processing is complete
        if (count == 0)
            break;

  ModelResults result;
        lock (lockobject)
        {
            result = resultQueue.Dequeue();
        }
        dataAccess.Save(result);
    }
    SaveCompleteSignal.Set();
}
Sam