views:

67

answers:

2

I'm using a LinkedBlockingQueue to handle message objects from other threads. For example, I have something like this:

LinkedBlockingQueue<Message> message_queue = new LinkedBlockingQueue<Message>();

public void run(){
    while(true){
        Message m = message_queue.take();
        handle_message(m);
    }
}

When I add a new message from another thread, I call message_queue.put(m). No problem! My question is this:

How do I synchronize the addition of a SET of messages all at once? Let's say I want to do this:

Message[] messages = {/* some list of message objects */};
for (Message m : messages){
    message_queue.put(m);
}

The problem is that another thread could be doing the same thing, and the messages from one thread are not guaranteed to be queued exactly as I intend. Messages from the competing thread can be "interleaved" inbetween (i.e. the actual sequence could end up being A, A, B, B, A, B instead of the intended A, A, A, B, B, B) I know I could put the loop in the "put" example into a "synchronized(message_queue){}" block, but would I also need to put that same block around the call to .take()? And obviously, I can't do that, as it would instantly create a deadlock situation, since take() blocks until a new Message is put(), which cannot happen when locked by synchronization. Can I skip the synchronized block on the take() call, having only a synchronized block on the put loop, and get the desired effect? Thanks!

+2  A: 

Perhaps the object you want to put onto the queue (and take off) would be a group of messages ?

i.e. perhaps you have a object representing an ordered collection of messages. Normally it contains just the one message, but could (in this scenario) contain many. That would maintain atomicity and ordering.

Brian Agnew
+1 bah ... I was still typing my response :( beat me to it
basszero
good idea but it does complicate things if you want to pull off 1 message at a time.
Jason S
+2  A: 

Unfortunately you cannot gain access to the individual put and take locks within LinkedBlockingQueue as they are private.

As you mention you could simply wrap the bulk put operation in a synchronized block, which would make this atomic. If you do this I don't understand why you state that you'd also need to add a synchronized block around the take() operation. I don't believe this second synchronized block would be needed (as LinkedBlockingQueue is thread-safe anyway).

Adamski
agreed: the only ordering issue is whether 2 bulk put operations collide with each other or with a single put operation.
Jason S