In your code, while the thread is blocked on BlockingQueue.take()
it is holding on to the lock on this
. The lock isn't released until either the code leaves the synchronized block or this.wait()
is called.
Here I assume that moveToFirst()
and moveToSecond()
should block, and that your class controls all access to the queues.
private final BlockingQueue<Object> firstQ = new LinkedBlockingQueue();
private final Semaphore firstSignal = new Semaphore(0);
private final BlockingQueue<Object> secondQ = LinkedBlockingQueue();
private final Semaphore secondSignal = new Semaphore(0);
private final Object monitor = new Object();
public void moveToSecond() {
int moved = 0;
while (moved == 0) {
// bock until someone adds to the queue
firstSignal.aquire();
// attempt to move an item from one queue to another atomically
synchronized (monitor) {
moved = firstQ.drainTo(secondQ, 1);
}
}
}
public void putInFirst(Object object) {
firstQ.put(object);
// notify any blocking threads that the queue has an item
firstSignal.release();
}
You would have similar code for moveToFirst()
and putInSecond()
. The while
is only needed if some other code might remove items from the queue. If you want the method that removes on the queue to wait for pending moves, it should aquire a permit from the semaphore, and the semaphore should be created as a fair Semaphore, so the first thread to call aquire
will get released first:
firstSignal = new Semaphore(0, true);
If you don't want moveToFirst()
to block you have a few options
- Have the method do do its work in a
Runnable
sent to an Executor
- Pass a timeout to
moveToFirst()
and use BlockingQueue.poll(int, TimeUnit)
- Use
BlockingQueue.drainTo(secondQ, 1)
and modify moveToFirst()
to return a boolean to indicate if it was successful.
For the above three options, you wouldn't need the semaphore.
Finally, I question the need to make the move atomic. If multiple threads are adding or removing from the queues, then an observing queue wouldn't be able to tell whether moveToFirst()
was atomic.