tags:

views:

197

answers:

4

I have a requirement to manipulate two queues atomically and am not sure what is the correct synchronization strategy: This is what I was trying: public class transfer {

BlockingQueue firstQ; BlockingQueue secondQ;

public moveToSecond() { synchronized (this){ Object a = firstQ.take(); secondQ.put(a) } }

public moveToFirst() { synchronized(this) { Object a = secondQ.take(); firstQ.put(a); } }

Is this the correct pattern? In the method moveToSecond(), if firstQ is empty, the method will wait on firstQ.take(), but it still holds the lock on this object. This will prevent moveToFirst() to have a chance to execute.

I am confused about the lock release during a wait - Does the thread release all locks [both this and BlockedQUeue lock?]? What is the correct pattern to provide atomicity dealing with multiple blocking queues?

A: 

You should use the Lock-mechanism from java.util.concurrency, like this:

Lock lock = new ReentrantLock();
....
lock.lock();
try {
    secondQ.put(firstQ.take());
} finally {
    lock.unlock();
}

Do the same for firstQ.put(secondQ.take()), using the same lock object.

There is no need to use the lowlevel wait/notify methods on the Object class anymore, unless you are writing new concurrency primitives.

JesperE
Lock was recommended in Java 1.5 over wait() due to efficiency reasons. In Java 1.6 they have nearly the same level of efficiency. Future VM optimizations will likely make wait/notify implementations faster.
Tim Bender
But lock is much easier to use correctly.
JesperE
How does this solve the OP's problem of the first queue being empty? The code will still block indefinitely on the take() method.
Adamski
It doesn't. ;-)
JesperE
I don't get it. How come it doesn't block?
Seun Osewa
Hm. This is an old question, but I think that my smiley indicates that my solution will deadlock as well.
JesperE
A: 

In your code, while the thread is blocked on BlockingQueue.take() it is holding on to the lock on this. The lock isn't released until either the code leaves the synchronized block or this.wait() is called.

Here I assume that moveToFirst() and moveToSecond() should block, and that your class controls all access to the queues.

private final BlockingQueue<Object> firstQ = new LinkedBlockingQueue();
private final Semaphore firstSignal = new Semaphore(0);
private final BlockingQueue<Object> secondQ = LinkedBlockingQueue();
private final Semaphore secondSignal = new Semaphore(0);

private final Object monitor = new Object();

public void moveToSecond() {
  int moved = 0;
  while (moved == 0) {

    // bock until someone adds to the queue
    firstSignal.aquire();

    // attempt to move an item from one queue to another atomically
    synchronized (monitor) {
      moved = firstQ.drainTo(secondQ, 1);
    }
  }
}

public void putInFirst(Object object) {
  firstQ.put(object);

  // notify any blocking threads that the queue has an item
  firstSignal.release();
}

You would have similar code for moveToFirst() and putInSecond(). The while is only needed if some other code might remove items from the queue. If you want the method that removes on the queue to wait for pending moves, it should aquire a permit from the semaphore, and the semaphore should be created as a fair Semaphore, so the first thread to call aquire will get released first:

firstSignal = new Semaphore(0, true);

If you don't want moveToFirst() to block you have a few options

  1. Have the method do do its work in a Runnable sent to an Executor
  2. Pass a timeout to moveToFirst() and use BlockingQueue.poll(int, TimeUnit)
  3. Use BlockingQueue.drainTo(secondQ, 1) and modify moveToFirst() to return a boolean to indicate if it was successful.

For the above three options, you wouldn't need the semaphore.

Finally, I question the need to make the move atomic. If multiple threads are adding or removing from the queues, then an observing queue wouldn't be able to tell whether moveToFirst() was atomic.

NamshubWriter
+2  A: 

You are using the correct approach using a common mutex to synchronize between both queues. However, to avoid the situation you describe with the first queue being empty I'd suggest reimplementing moveToFirst() and moveToSecond() to use poll() rather than take(); e.g.

public void boolean moveToFirst() {
  // Synchronize on simple mutex; could use a Lock here but probably
  // not worth the extra dev. effort.
  synchronzied(queueLock) {
    boolean success;

    // Will return immediately, returning null if the queue is empty.
    Object o = firstQ.poll();

    if (o != null) {
      // Put could block if the queue is full.  If you're using a bounded
      // queue you could use add(Object) instead to avoid any blocking but
      // you would need to handle the exception somehow.
      secondQ.put(o);
      success = true;
    } else {
      success = false;
    }
  }

  return success;
}
Adamski
+1  A: 

Another failure condition you didn't mention is if firstQ is not empty but secondQ is full, the item will be removed from firstQ but there will be no place to put it.

So the only correct way is to use poll and offer with timeouts and code to return things to the way they were before any failure (important!), then retry after a random time until both poll and offer are successful.

This is an optimistic approach; efficient in normal operation but quite inefficient when deadlocks are frequent (average latency depends on the timeout chosen)

Seun Osewa