views:

40

answers:

3

Can anyone see any problems with this Producer/Consumer unique keyed buffer impl? The idea is if you add items for processing with the same key only the lastest value will be processed and the old/existing value will be thrown away.

public sealed class PCKeyedBuffer<K,V>
{
    private readonly object _locker = new object();
    private readonly Thread _worker;
    private readonly IDictionary<K, V> _items = new Dictionary<K, V>();
    private readonly Action<V> _action;
    private volatile bool _shutdown;

    public PCKeyedBuffer(Action<V> action)
    {
        _action = action;
        (_worker = new Thread(Consume)).Start();
    }

    public void Shutdown(bool waitForWorker)
    {
        _shutdown = true;
        if (waitForWorker)
                _worker.Join();
    }

    public void Add(K key, V value)
    {
        lock (_locker)
        {
            _items[key] = value;
            Monitor.Pulse(_locker);
        }
    }

    private void Consume()
    {
        while (true)
        {
            IList<V> values;
            lock (_locker)
            {
                while (_items.Count == 0) Monitor.Wait(_locker);
                values = new List<V>(_items.Values);
                _items.Clear();
            }
            foreach (V value in values)
            {
                 _action(value);
            }

            if(_shutdown) return;
        }
    }
}


    static void Main(string[] args)
    {
        PCKeyedBuffer<string, double> l = new PCKeyedBuffer<string, double>(delegate(double d)
                                                                                {
                                                                                    Thread.Sleep(10);
                                                                                    Console.WriteLine(
                                                                                        "Processed: " + d.ToString());
                                                                                });
        for (double i = 0; i < 100; i++)
        {
            l.Add(i.ToString(), i);
        }
        for (double i = 0; i < 100; i++)
        {
            l.Add(i.ToString(), i);
        }
        for (double i = 0; i < 100; i++)
        {
            l.Add(i.ToString(), i);
        }

        Console.WriteLine("Done Enqeueing");
        Console.ReadLine();
    }
+2  A: 

After a quick once over I would say that the following code in the Consume method

while (_items.Count == 0) Monitor.Wait(_locker);

Should probably Wait using a timeout and check the _shutdown flag each iteration. Especially since you are not setting your consumer thread to be aq background thread.

In addition, the Consume method does not appear very scalable, since it single handedly tries to process an entire queue of items. Of course this might depend on the rate that items are being produced. I would probably have the consumer focus on a single item in the list and then use TPL to run multiple concurrent consumers, this way you can take advantage of multple cores while letting TPL balance the work load for you. To reduce the required locking for the consumer processing a single item you could use a ConcurrentDictionary

Chris Taylor
+1  A: 

As Chris pointed out, ConcurrentDictionary already exists and is more scalable. It was added to the base libraries in .NET 4.0, and is also available as an add-on to .NET 3.5.

Stephen Cleary
A: 

This is one of the few attempts at creating a custom producer/consumer that is actually correct. So job well done in that regard. However, like Chris pointed out your stop flag will be ignored while Monitor.Wait is blocked. There is no need to rehash his suggestion for fixing that. The advice I can offer is to use a BlockingCollection instead of doing the Wait/Pulse calls manually. That would also solve the shutdown problem since the Take method is cancellable. If you are not using .NET 4.0 then it available in the Reactive Extension download that Stephen linked to. If that is not an option then Stephen Toub has a correct implementation here (except his is not cancellable, but you can always do a Thread.Interrupt to safely unblock it). What you can do is feed in KeyValuePair items into the queue instead of using a Dictionary.

Brian Gideon