Update: As Brian pointed out, my original idea did indeed have a concurrency issue. This was somewhat obscured by the signature of the ConcurrentDictionary<TKey, TValue>.AddOrUpdate
method, which can lull a lazy thinker (like myself) into believing that everything--the set add as well as the queue push--will somehow happen all at once, atomically (i.e., magically).
In retrospect, it was foolish of me to have this expectation. In fact, regardless of the implementation of AddOrUpdate
, it should be clear that a race condition would still exist in my original idea, as Brian pointed out: pushing to the queue would happen before adding to the set, hence the following sequence of events could occur:
- Item pushed to queue
- Item popped from queue
- Item (not) removed from set
- Item added to set
The above sequence would result in an item in the set that isn't in the queue, effectively blacklisting that item from the data structure.
Now, I thought about this for a short while, and I'm beginning to think that the following approach may resolve these issues:
public bool Enqueue(T item)
{
// This should:
// 1. return true only when the item is first added to the set
// 2. subsequently return false as long as the item is in the set;
// and it will not be removed until after it's popped
if (_set.TryAdd(item, true))
{
_queue.Enqueue(item);
return true;
}
return false;
}
Structuring it this way, the Enqueue
call only happens once -- after the item is in the set. So duplicate items in the queue should not be a problem. And it seems that since the queue operations are "bookended" by the set operations--i.e., an item is pushed only after it's added to the set, and it's popped before it's removed from the set--the problematic sequence of events outlined above should not occur.
What do people think? Could it be that this resolves the problem? (Like Brian, I am inclined to doubt myself and guess that the answer is No, and I am missing something once again. But hey, it wouldn't be a fun challenge if it were easy, right?)
I have definitely seen similar questions posed here on SO, but surprisingly (considering how .NET-heavy this site is) they all seemed to be for Java.
I have need of essentially a set/queue combo class that is thread-safe. In other words, it should be a FIFO collection that does not allow duplicates (so if the same item is already in the queue, subsequent Enqueue
calls will return false, until the item is popped from the queue).
I realize I could implement this quite easily with a simple HashSet<T>
and a Queue<T>
with locking in all the necessary places. However, I was interested in accomplishing it with the ConcurrentDictionary<TKey, TValue>
and ConcurrentQueue<T>
classes from .NET 4.0 (also available as part of Rx extensions for .NET 3.5, which is what I'm using), which I understand to be somehow lock-free collections*.
My basic plan was to implement this collection something like this:
class ConcurrentSetQueue<T>
{
ConcurrentQueue<T> _queue;
ConcurrentDictionary<T, bool> _set;
public ConcurrentSetQueue(IEqualityComparer<T> comparer)
{
_queue = new ConcurrentQueue<T>();
_set = new ConcurrentDictionary<T, bool>(comparer);
}
public bool Enqueue(T item)
{
// This should:
// 1. if the key is not present, enqueue the item and return true
// 2. if the key is already present, do nothing and return false
return _set.AddOrUpdate(item, EnqueueFirst, EnqueueSecond);
}
private bool EnqueueFirst(T item)
{
_queue.Enqueue(item);
return true;
}
private bool EnqueueSecond(T item, bool dummyFlag)
{
return false;
}
public bool TryDequeue(out T item)
{
if (_queue.TryDequeue(out item))
{
// Another thread could come along here, attempt to enqueue, and
// fail; however, this seems like an acceptable scenario since the
// item shouldn't really be considered "popped" until it's been
// removed from both the queue and the dictionary.
bool flag;
_set.TryRemove(item, out flag);
return true;
}
return false;
}
}
Have I thought this through properly? On the surface, I can't see any obvious bugs in this basic idea as I've written it above. But maybe I'm overlooking something. Or maybe the use of a ConcurrentQueue<T>
with a ConcurrentDictionary<T, bool>
isn't actually a wise choice, for reasons that have not occurred to me. Or maybe somebody else has already implemented this idea in a battle-proven library somewhere, and I should just use that.
Any thoughts or useful info on this subject would be much appreciated!
*Whether this is strictly accurate, I don't know; but performance tests have indicated to me that they do outperform comparable hand-rolled collections that use locking with many consumer threads.