views:

85

answers:

2

I have 1 thread streaming data and a 2nd (the threadpool) processing the data. The data processing takes around 100ms so I use to second thread so not to hold up the 1st thread.

While the 2nd thread is processing the data the 1st thread adds the data to a dictionary cache then when the 2nd thread is finished it processes the cached values.

My questions is this how should be doing producer /consumer code in C#?

public delegate void OnValue(ulong value);

public class Runner
{
    public event OnValue OnValueEvent;
    private readonly IDictionary<string, ulong> _cache = new Dictionary<string, ulong>(StringComparer.InvariantCultureIgnoreCase);
    private readonly AutoResetEvent _cachePublisherWaitHandle = new AutoResetEvent(true);

    public void Start()
    {
        for (ulong i = 0; i < 500; i++)
        {
            DataStreamHandler(i.ToString(), i);
        }
    }

    private void DataStreamHandler(string id, ulong value)
    {
        _cache[id] = value;

        if (_cachePublisherWaitHandle.WaitOne(1))
        {
            IList<ulong> tempValues = new List<ulong>(_cache.Values);
            _cache.Clear();

            _cachePublisherWaitHandle.Reset();

            ThreadPool.UnsafeQueueUserWorkItem(delegate
            {
                try
                {
                    foreach (ulong value1 in tempValues)
                        if (OnValueEvent != null)
                            OnValueEvent(value1);
                }
                finally
                {
                    _cachePublisherWaitHandle.Set();
                }
            }, null);
        }
        else
        {
            Console.WriteLine(string.Format("Buffered value: {0}.", value));
        }
    }
}

class Program
{
    static void Main(string[] args)
    {
        Stopwatch sw = Stopwatch.StartNew();
        Runner r = new Runner();
        r.OnValueEvent += delegate(ulong x)
                              {
                                  Console.WriteLine(string.Format("Processed value: {0}.", x));
                                  Thread.Sleep(100);

                                  if(x == 499)
                                  {
                                      sw.Stop();
                                      Console.WriteLine(string.Format("Time: {0}.", sw.ElapsedMilliseconds));
                                  }
                              };
        r.Start();
        Console.WriteLine("Done");
        Console.ReadLine();
    }
}
A: 

There is a good article on MSDN about Synchronizing the Producer and Consumer. There is also a good example on the Albahari site.

SwDevMan81
The Albahari site is correct. Sadly, that MSDN article has an incorrect implemenation which can lead to a live-lock scenario in situation where you wnat more than one consumer.
Brian Gideon
+1  A: 

The best practice for setting up the producer-consumer pattern is to use the BlockingCollection class which is available in .NET 4.0 or as a separate download of the Reactive Extensions framework. The idea is that the producers will enqueue using the Add method and the consumers will dequeue using the Take method which blocks if the queue is empty. Like SwDevMan81 pointed out the Albahari site has a really good writeup on how to make it work correctly if you want to go the manual route.

Brian Gideon