views:

56

answers:

4

I have a thread, that processes incomming messages (endless loop). For this, I use a BlockingQueue (Java), which works as quite nice. Now, I want to add a second processor in the same Class oder method. The problem now is, that in the endless loop i have this part

newIncomming = this.incommingProcessing.take();

This part blocks if the Queue is empty. I'm looking for a solution to process to queues in the same class. The second queue can only processed, it some data is coming in for the first Queue. Is there a way to handle tow blocking queues in the same endless loop?

A: 

BlockingQueue is meant for multiple thread implementations. Instead, use a simple Queue. See this.

wallyk
+1  A: 

Either you need two threads or you need them to share the same blocking queue. (Or you need to use a different structure than blocking queue)

Bill K
A: 

I am not sure what you are trying to do, but if you do not want the thread to block on the queue if it is empty, you can use BlockingQueue.peek() to first check if the queue is empty or not.

pajton
A: 

What I understand from your question I came up with the following

Code snippet

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */

package blockingqueues;

import java.io.BufferedReader;
import java.io.Console;
import java.io.InputStreamReader;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author alfred
 */
public class BlockingQueues {
    private static String take(BlockingQueue<String> bq) {
        try {
            return bq.take();
        } catch(InterruptedException ie) {
            return null;
        }
    }

    public static void main(String args[]) throws Exception {
        final BlockingQueue<String> b1 = new LinkedBlockingQueue<String>();
        final BlockingQueue<String> b2 = new LinkedBlockingQueue<String>();
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(new Runnable() {
            public void run() {
                while (true) {
                    String results = take(b1);
                    if (results == null) {
                        break;
                    }
                    System.out.println("first: " + results);
                }
            }
        });
        es.execute(new Runnable() {
            public void run() {
                while (true) {
                    String results = take(b2);
                    if (results == null) {
                        break;
                    }
                    System.out.println("second: " + results);
                }
            }
        });
        BufferedReader br = new BufferedReader(
                new InputStreamReader(System.in)
                );
        String input = null;
        System.out.println("type x to quit.");
        while (!"x".equals(input)) {
            input = br.readLine();
            if (input.startsWith("1 ")) {
                // Add something to first blocking queue.
                b1.add(input.substring(2));   
            } else if (input.startsWith("2 ")) {
                // Add something to second blocking queue.
                b2.add(input.substring(2));
            }
        }
        es.shutdownNow();
        es.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("bye!");
        br.close();
    }
}

Executing program:

You can enter text from console to add task to blockingqueue b1 or b2. If your console input starts with a 1 like for example input = "1 hello" then the b1 will process task(print first: hello) else if input starts with a 2 like for example input = "world" then b2 will print second: world.

Alfred