views:

79

answers:

3

hello i want to threads to collaborate a producer and a consumer. the consumer is rather slow, and the producer is very fast and works in bursts.

for example the consumer can process one message per 20 seconds, and the producer can produce 10 messages in one second, but does it about once in a long while so the consumer can catch up.

i want something like:

Stream commonStream;
AutoResetEvent commonLock;

void Producer()
{
  while (true)
  {
    magic.BlockUntilMagicAvalible();
    byte[] buffer = magic.Produce();
    commonStream.Write(buffer);
    commonLock.Set();
  }
}

void Consumer()
{
  while(true)
  { 
    commonLock.WaitOne();
    MagicalObject o = binarySerializer.Deserialize(commonStream);
    DoSomething(o);
  }
}
A: 

I would read the following articles they describe your problem. Basically you're not getting the right isolation for your unit of work.

http://blogs.msdn.com/b/ricom/archive/2006/04/24/582643.aspx http://blogs.msdn.com/b/ricom/archive/2006/04/26/584802.aspx

Conrad Frix
yeah its kinda what i wish i could write...
Hellfrost
+2  A: 

If you have .Net 4.0 you can do it this way

int maxBufferCap = 500;
System.Collections.Concurrent.BlockingCollection<MagicalObject> Collection = new System.Collections.Concurrent.BlockingCollection<MagicalObject>(maxBufferCap);
void Producer()
{
    while (magic.HasMoreMagic)
    {
        Collection.Add(magic.ProduceMagic());
    }
    Collection.CompleteAdding();
}

void Consumer()
{
    foreach (MagicalObject o in Collection.GetConsumingEnumerable())
    {
        DoSomthing(o);
    }
}

the foreach line will sleep if there is no data in the buffer. It will automatically wake it self up when you add something to the collection. The max buffer size is good because your producer is so much faster than the consumer, when the buffer is full the producer will sleep on Collection.Add until there is more room.

The very nice thing about this setup is if you are not too worried about the ordering and it you can do more than one DoSomthing() at a time you can create multiple consumers all pulling from the same collection.

Scott Chamberlain
Note that the TPL has been back-ported to .NET 3.5: http://codeblog.theg2.net/2010/02/tpl-and-parallelforeach-in-net-35-using.html
Dan Bryant
A: 

You can get what you want using a queue and timer. The producer adds values to the queue and starts the consumer timer. The consumer timer's elapsed event (which is on a Threadpool thread) stops the timer, and loops through the queue until it's empty, then disappears (no unnecessary polling). The producer can add to the queue while the consumer is still running.

System.Timers.Timer consumerTimer;
Queue<byte[]> queue = new Queue<byte[]>();

void Producer()
{
    consumerTimer = new System.Timers.Timer(1000);
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed);
    while (true)
    {
        magic.BlockUntilMagicAvailable();
        lock (queue)
        {
            queue.Enqueue(magic.Produce());
            if (!consumerTimer.Enabled)
            {
                consumerTimer.Start();
            }
        }
    }
}

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
    while (true)
    {
        consumerTimer.Stop();
        lock (queue)
        {
            if (queue.Count > 0)
            {
                DoSomething(queue.Dequeue());
            }
            else
            {
                break;
            }
        }
    }
}
ebpower
your snippet isnt thread safe... and mine implies no polling
Hellfrost
What isn't thread safe about it? And it doesn't poll - the timer is a one-shot that's only activated when the producer adds to the queue.
ebpower