views:

227

answers:

5

Maybe this is a silly question, but I cannot seem to find an obvious answer.

I need a concurrent FIFO queue that contains only unique values. Attempting to add a value that already exists in the queue simply ignores that value. Which, if not for the thread safety would be trivial. Is there a data structure in Java or maybe a code snipit on the interwebs that exhibits this behavior?

+1  A: 

There's not a built-in collection that does this. There are some concurrent Set implementations that could be used together with a concurrent Queue.

For example, an item is added to the queue only after it was successfully added to the set, and each item removed from the queue is removed from the set. In this case, the contents of the queue, logically, are really whatever is in the set, and the queue is just used to track the order and provide efficient take() and poll() operations found only on a BlockingQueue.

erickson
One of my implementations used a LinkedHashSet so that I only had one data structure and could depend on the order. However, the algorithm behind the ConcurrentQueue is quite a bit more sophisticated then using synchronization locks and I was wondering if there was a performant collection that had the additional uniqueness constraint.
Ambience
@Ambience - There isn't such a collection. My technique allows you to use a concurrent `Queue` (either `LinkedBlockingQueue` if you need `take()` or `ConcurrentLinkedQueue` if you only need `poll()`), preserving FIFO ordering, while adding `Set`-like uniqueness.
erickson
+4  A: 

A java.util.concurrent.ConcurrentLinkedQueue gets you most of the way there.

Wrap the ConcurrentLinkedQueue with your own class that checks for the uniqueness of an add. Your code has to be thread safe.

Gilbert Le Blanc
Once wrapped, it might not be thread-safe anymore, even if it is based on a `ConcurrentLinkedQueue`.
finnw
@finnw: Duly noted. I updated my answer.
Gilbert Le Blanc
My first pass implementation pretty much does exactly this but I am worried about the cost of invoking .contains() on a linked list backed queue and also the synchronizing the queue methods completely negates the benefits of the underlying algorithm of ConcurrentLinkdQueue.
Ambience
@Ambience: Based on the Javadoc for ConcurrentLinkedQueue, I'm guessing that the invocation time of the contains() method is bound by N. The only method you should have to synchronize in your ConcurrentSetQueue is the add method. The other methods are synchronized in the ConcurrentLinkedQueue.
Gilbert Le Blanc
+2  A: 

If you want better concurrency than full synchronization, there is one way I know of to do it, using a ConcurrentHashMap as the backing map. The following is a sketch only.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

All is not sunshine with this approach. We have no decent way to select a head element other than using the backing map's entrySet().iterator().next(), the result being that the map gets more and more unbalanced as time goes on. This unbalancing is a problem both due to greater bucket collisions and greater segment contention.

Note: this code uses Guava in a few places.

Kevin Bourrillion
How does this preserve the order of the `Queue`? The iteration order is dependent on the implementation of the backing map.
erickson
Preserve what order? I don't see any requirement about order, unless it's assumed that he wants a *FIFO* queue... I've added a comment to ask.
Kevin Bourrillion
+2  A: 

I would use a synchronized LinkedHashSet until there was enough justification to consider alternatives. The primary benefit that a more concurrent solution could offer is lock splitting.

The simplest concurrent approach would be a a ConcurrentHashMap (acting as a set) and a ConcurrentLinkedQueue. The ordering of operations would provide the desired constraint. An offer() would first perform a CHM#putIfAbsent() and if successful insert into the CLQ. A poll() would take from the CLQ and then remove it from the CHM. This means that we consider an entry in our queue if it is in the map and the CLQ provides the ordering. The performance could then be adjusted by increasing the map's concurrencyLevel. If you are tolerant to additional racy-ness, then a cheap CHM#get() could act as a reasonable precondition (but it can suffer by being a slightly stale view).

Ben Manes
+1  A: 

What do you mean by a concurrent queue with Set semantics? If you mean a truly concurrent structure (as opposed to a thread-safe structure) then I would contend that you are asking for a pony.

What happens for instance if you call put(element) and detect that something is already there which immediately is removed? For instance, what does it mean in your case if offer(element) || queue.contains(element) returns false?

These kinds of things often need to thought about slightly differently in a concurrent world as often nothing is as it seems unless you stop the world (lock it down). Otherwise you are usually looking at something in the past. So, what are you actually trying to do?

Jed Wesley-Smith
Interesting observation, if not an answer =) Are you below the commenting point threshold?
Ambience
I was :-) Sorry about that.
Jed Wesley-Smith