views:

108

answers:

2

Update: As Brian pointed out, my original idea did indeed have a concurrency issue. This was somewhat obscured by the signature of the ConcurrentDictionary<TKey, TValue>.AddOrUpdate method, which can lull a lazy thinker (like myself) into believing that everything--the set add as well as the queue push--will somehow happen all at once, atomically (i.e., magically).

In retrospect, it was foolish of me to have this expectation. In fact, regardless of the implementation of AddOrUpdate, it should be clear that a race condition would still exist in my original idea, as Brian pointed out: pushing to the queue would happen before adding to the set, hence the following sequence of events could occur:

  1. Item pushed to queue
  2. Item popped from queue
  3. Item (not) removed from set
  4. Item added to set

The above sequence would result in an item in the set that isn't in the queue, effectively blacklisting that item from the data structure.

Now, I thought about this for a short while, and I'm beginning to think that the following approach may resolve these issues:

public bool Enqueue(T item)
{
    // This should:
    // 1. return true only when the item is first added to the set
    // 2. subsequently return false as long as the item is in the set;
    //    and it will not be removed until after it's popped
    if (_set.TryAdd(item, true))
    {
        _queue.Enqueue(item);
        return true;
    }

    return false;
}

Structuring it this way, the Enqueue call only happens once -- after the item is in the set. So duplicate items in the queue should not be a problem. And it seems that since the queue operations are "bookended" by the set operations--i.e., an item is pushed only after it's added to the set, and it's popped before it's removed from the set--the problematic sequence of events outlined above should not occur.

What do people think? Could it be that this resolves the problem? (Like Brian, I am inclined to doubt myself and guess that the answer is No, and I am missing something once again. But hey, it wouldn't be a fun challenge if it were easy, right?)


I have definitely seen similar questions posed here on SO, but surprisingly (considering how .NET-heavy this site is) they all seemed to be for Java.

I have need of essentially a set/queue combo class that is thread-safe. In other words, it should be a FIFO collection that does not allow duplicates (so if the same item is already in the queue, subsequent Enqueue calls will return false, until the item is popped from the queue).

I realize I could implement this quite easily with a simple HashSet<T> and a Queue<T> with locking in all the necessary places. However, I was interested in accomplishing it with the ConcurrentDictionary<TKey, TValue> and ConcurrentQueue<T> classes from .NET 4.0 (also available as part of Rx extensions for .NET 3.5, which is what I'm using), which I understand to be somehow lock-free collections*.

My basic plan was to implement this collection something like this:

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue;
    ConcurrentDictionary<T, bool> _set;

    public ConcurrentSetQueue(IEqualityComparer<T> comparer)
    {
        _queue = new ConcurrentQueue<T>();
        _set = new ConcurrentDictionary<T, bool>(comparer);
    }

    public bool Enqueue(T item)
    {
        // This should:
        // 1. if the key is not present, enqueue the item and return true
        // 2. if the key is already present, do nothing and return false
        return _set.AddOrUpdate(item, EnqueueFirst, EnqueueSecond);
    }

    private bool EnqueueFirst(T item)
    {
        _queue.Enqueue(item);
        return true;
    }

    private bool EnqueueSecond(T item, bool dummyFlag)
    {
        return false;
    }

    public bool TryDequeue(out T item)
    {
        if (_queue.TryDequeue(out item))
        {
            // Another thread could come along here, attempt to enqueue, and
            // fail; however, this seems like an acceptable scenario since the
            // item shouldn't really be considered "popped" until it's been
            // removed from both the queue and the dictionary.
            bool flag;
            _set.TryRemove(item, out flag);

            return true;
        }

        return false;
    }
}

Have I thought this through properly? On the surface, I can't see any obvious bugs in this basic idea as I've written it above. But maybe I'm overlooking something. Or maybe the use of a ConcurrentQueue<T> with a ConcurrentDictionary<T, bool> isn't actually a wise choice, for reasons that have not occurred to me. Or maybe somebody else has already implemented this idea in a battle-proven library somewhere, and I should just use that.

Any thoughts or useful info on this subject would be much appreciated!

*Whether this is strictly accurate, I don't know; but performance tests have indicated to me that they do outperform comparable hand-rolled collections that use locking with many consumer threads.

+1  A: 

Have a look at Eric Lppert's blog. You might find something you like... http://blogs.msdn.com/b/ericlippert/archive/tags/immutability/

GregC
If you're looking to make sure your code is thread-safe, consider CHESS, a testing framework for concurrency verification.
GregC
That looks quite promising. Is there a particular article from Eric's blog that you would recommend as a starting point? (There are quite a few listed on that page you linked to.)
Dan Tao
The ones on immutable stack implementation is a great start. He grows that concept into immutable queues.
GregC
But that immutable queue does not prevent duplicates... And I think that will be difficult (impossible) to add.
Henk Holterman
+4  A: 

The short is no, the code presented in the question is not thread-safe.

The MSDN documentation is rather sparse on the AddOrUpdate method so I took a look at the AddOrUpdate method in Reflector. Here is the basic algorithm (I am not posting the Reflector output for legal reasons and its easy enough to do yourself).

TValue value;
do
{
  if (!TryGetValue(...))
  {
    value = AddValueFactoryDelegate(key);
    if (!TryAddInternal(...))
    {
      continue;
    }
    return value;
  }
  value = UpdateValueFactoryDelegate(key);
} 
while (!TryUpdate(...))
return value;

So clearly the AddValueFactoryDelegate and UpdateValueFactoryDelegate can execute more than once. There is no need for further explanation here. It should be quite obvious how this will break your code. I am actually a little shocked that the delegates can be executed multiple times. The documenation makes no mention of this. You would think that would be an incredibly important point so callers know to avoid passing delegates that have side-effects (as is your case).

But even if the delegates were guarenteed to only execute once there would still be a problem. It should be easy to visualize a problem sequence by replacing your Enqueue method with the contents of the AddOrUpdate method. The AddValueFactoryDelegate could execute and insert an item into _queue, but the thread could get interrupted by a context switch before adding the item to _set. A second thread could then call your TryDequeue method and extract that item from _queue, but fail to remove it from _set since it is not in there yet.

Update:

Okay, I do not think it is possible to get it to work. ConcurrentQueue is missing one crucial operation. I believe you need a CAS equivalent for the TryDequeue method. If such an operation existed then I think the following code would be correct. I use the mythical TryDequeueCas method which accepts a comparison value that is used as condition to say perform this operation atomically if and only if the top item on the queue equals the comparison value. The idea is exactly the same used in the Interlocked.CompareExchange method.

Notice how the code uses the bool value in the ConcurrentDictionary as a "virtual" lock to synchronize the coordination of the queue and the dictionary. The data structure also contains the CAS equivalent operation TryUpdate which is exploited to acquire and release this "virtual" lock. And because the lock is "virtual" and does not actually block concurrent access the while loop in the TryDequeue method is mandatory. That fits the canonical pattern of CAS operations in that they are usually performed in a loop until they succeed.

The code also uses the .NET 4.0 style try-finally pattern for lock acquire symantics to help guard against the problems caused by out-of-band (asynchronous) exceptions.

Note: Again, the code uses the mythical ConcurrentQueue.TryDequeueCas method.

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    ConcurrentDictionary<T, bool> _set = new ConcurrentDictionary<T, bool>();

    public ConcurrentSetQueue()
    {
    }

    public bool Enqueue(T item)
    {
        bool acquired = false;
        try
        {
            acquired = _set.TryAdd(item, true);
            if (acquired)
            {
                _queue.Enqueue(item);
                return true;
            }
            return false;
        }
        finally
        {
            if (acquired) _set.TryUpdate(item, false, true);
        }
    }

    public bool TryDequeue(out T item)
    {
        while (_queue.TryPeek(out item))
        {
            bool acquired = false;
            try
            {
                acquired = _set.TryUpdate(item, true, false);
                if (acquired)
                {
                    if (_queue.TryDequeueCas(out item, item))
                    {
                        return true;
                    }
                }
            }
            finally
            {
                if (acquired) _set.TryRemove(item, out acquired);
            }
        }
        item = default(T);
        return false;
    }
}

Update 2:

In reference to your modification notice how similiar it is when compared to mine. In fact, if you remove all of the fluff from my variation the Enqueue method has the exact same sequence of statements.

I was more worried about the TryDequeue though and that is why I added the "virtual" lock concept which required a lot of extra stuff in my implemenation. I was specifically worried about the opposite order of access to the data structures (dictionary then queue in the Enqueue method, but queue then dictionary in the TryDequeue method) But, the more I think about your modified approach the more I like it. And I now think it is because of the reversed access order that it is safe!

Brian Gideon
I am going to see if I can come up with a lock-free implemenation using `ConcurrentQueue` and `ConcurrentDictionary` because it could be a fun challenge. I'll update my answer if I have something that I think is presentable. I *highly* suspect that I will fail :)
Brian Gideon
Damn, solid detective work there. I am *also* quite surprised to learn that the `addValueFactory` delegate may be executed multiple times. But the concurrency issue you've spotted is obviously the more important point. I guess the lesson to be gleaned from this is that any delegates passed to the methods of `ConcurrentDictionary` should really only produce values and not have any side-effects (unless the thread safety of those side effects has been fully considered). Anyway, awesome answer.
Dan Tao
@Brian: I'm almost afraid to present it, given how likely it is that I've overlooked something; but I had an idea for a modification that I *think* (fingers crossed) might resolve the issues you pointed out, without actually changing much code. Care to take a look?
Dan Tao
@Dan: I'll take a look at your modification after lunch. Ponder my attempt which uses a crucial nonexistent "mythical" method. What do you think of it? Is it safe? If that method actually did exist I *think* it might work. Should I lobby Microsoft to add it in the next version of the framework?
Brian Gideon
@Dan: You know what...I *think* your modified solution is safe.
Brian Gideon
@Brian: I had actually started writing a comment in response to your hypothetical solution and was going to say pretty much what you've said in your update: while they look a lot different, I think our ideas end up behaving the same. It seems the crucial point is to make sure that the set acts as a barrier to the queue; that is, the push/pop have to be "inside" the set operations. Since both of our ideas have this characteristic, I believe they will behave identically. (But notice the abundance of the words *think* and *believe* in this conversation!)
Dan Tao