views:

124

answers:

2

System.Collections.Concurrent has some new collections that work very well in multithreaded environments. However, they are a bit limited. Either they block until an item becomes available, or they return default(T) (TryXXX methods).

I'm needing a collection that is thread safe, but instead of blocking the calling thread it uses a callback to inform me that at least one item is available.

My current solution is to use a BlockingCollection, but to use the APM with a delegate to get the next element. In other words, I create a delegate to a method that Takes from the collection, and execute that delegate using BeginInvoke.

Unfortunately, I have to keep a lot of state within my class in order to accomplish this. Worse, the class is not thread safe; it can only be used by a single thread. I'm skirting the edge of maintainability, which I'd prefer not to do.

I know there are some libraries out there that make what I'm doing here pretty simple (I believe the Reactive Framework is one of these), but I'd like to accomplish my goals without adding any references outside of version 4 of the framework.

Are there any better patterns I can use that don't require outside references that accomplish my goal?


tl;dr:

Are there any patterns that satisfy the requirement:

"I need to signal a collection that I am ready for the next element, and have the collection execute a callback when that next element has arrived, without any threads being blocked."

+1  A: 

I think I have two possible solutions. I am not particularly satisfied with either, but they do at least provide a reasonable alternative to the APM approach.

The first does not meet your requirement of no blocking thread, but I think it is rather elegant because you can register callbacks and they will get called in round-robin fashion, but you still have the ability to call Take or TryTake as you normally would for a BlockingCollection. This code forces callbacks to be registered each time an item is requested. That is the signalling mechanism for the collection. The nice thing about this approach is that calls to Take do not get starved as they do in my second solution.

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
    private Thread m_Notifier;
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

    public NotifyingBlockingCollection()
    {
        m_Notifier = new Thread(Notify);
        m_Notifier.IsBackground = true;
        m_Notifier.Start();
    }

    private void Notify()
    {
        while (true)
        {
            Action<T> callback = m_Callbacks.Take();
            T item = Take();
            callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
        }
    }

    public void RegisterForTake(Action<T> callback)
    {
        m_Callbacks.Add(callback);
    }
}

The second does meet your requirement of no blocking thread. Notice how it transfers the invocation of the callback to the thread pool. I did this because I am thinking that if it got executed synchronously then the locks would be held longer resulting in the bottlenecking of Add and RegisterForTake. I have looked it over closely and I do not think it can get live locked (both an item and a callback are available, but the callback never gets executed) but you might want to look it over yourself to verify. The only problem here is that a call to Take would get starved as callbacks always take priority.

public class NotifyingBlockingCollection<T>
{
    private BlockingCollection<T> m_Items = new BlockingCollection<T>();
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

    public NotifyingBlockingCollection()
    {
    }

    public void Add(T item)
    {
        lock (m_Callbacks)
        {
            if (m_Callbacks.Count > 0)
            {
                Action<T> callback = m_Callbacks.Dequeue();
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Items.Add(item);
            }
        }
    }

    public T Take()
    {
        return m_Items.Take();
    }

    public void RegisterForTake(Action<T> callback)
    {
        lock (m_Callbacks)
        {
            T item;
            if (m_Items.TryTake(out item))
            {
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Callbacks.Enqueue(callback);
            }
        }
    }
}
Brian Gideon
Thanks for the answer, but its slightly not what I'm looking for. This is what I'm currently doing, but with the APM pushed into the collection (the code you've provided). I guess the crux of my issue is that APM doesn't fit my requirements, its just the implementation I've used. My requirements demand a pattern which provides a solution to the question, "How can I signal a collection that I am ready for the next element, and have the collection execute a callback when that next element has arrived, without any threads being blocked?"
Will
I kind of figured that was not what you were after. It is an interesting problem though. Too bad `Add` is not `virtual` otherwise you might have been able to inject the notification there somehow. Maybe you could use one of the blocking queue implementations as a starting point. The problem is that you have to be careful how you deliver that notification otherwise another consumer will have grabbed the item first. I might play around with this today if I get time. Post an answer yourself if you figure it out. I don't know...you may find it easier to punt and just reference an another library.
Brian Gideon
@Brian The notification should contain the next element and should be controlled by the notifier. Perhaps the idea of this being a collection is erroneous; only via this mechanism can the next item be supplied, thus avoiding the issue of two observers contending over a single item. In other words, one observer can't use mechanism A to get the next item (i.e., `T Pop()`) while the other has registered for a callback.
Will
@Will: Take a look now. I have two different ideas. I am little concerned about the second example regarding a possible live lock situation, but the best I can tell it appears to be safe for multiple producers and multiple consumers.
Brian Gideon
@Bri thanks. I'll probably end up adapting some of this. I was hoping something similar to what I needed was in the framework, but so it goes.
Will
+2  A: 

How about something like this? (The naming could probably use some work. And note that this is untested.)

public class CallbackCollection<T>
{
    // Sychronization object to prevent race conditions.
    private object _SyncObject = new object();

    // A queue for callbacks that are waiting for items.
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

    // A queue for items that are waiting for callbacks.
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

    public void Add(T item)
    {
        Action<T> callback;
        lock (_SyncObject)
        {
            // Try to get a callback. If no callback is available,
            // then enqueue the item to wait for the next callback
            // and return.
            if (!_Callbacks.TryDequeue(out callback))
            {
                _Items.Enqueue(item);
                return;
            }
        }

        ExecuteCallback(callback, item);
    }

    public void TakeAndCallback(Action<T> callback)
    {
        T item;
        lock(_SyncObject)
        {
            // Try to get an item. If no item is available, then
            // enqueue the callback to wait for the next item
            // and return.
            if (!_Items.TryDequeue(out item))
            {
                _Callbacks.Enqueue(callback);
                return;
            }
        }
        ExecuteCallback(callback, item);
    }

    private void ExecuteCallback(Action<T> callback, T item)
    {
        // Use a new Task to execute the callback so that we don't
        // execute it on the current thread.
        Task.Factory.StartNew(() => callback.Invoke(item));
    }
}
Jack Leitch
Just refreshed and saw @Brian's NotifyingBlockingCollection. Looks like he and I came up with roughly the same solution at the same time.
Jack Leitch
Yep, we were definitely thinking along the same lines here, especially the part about getting the invocation of the callback off the current thread.
Brian Gideon