views:

207

answers:

4

Hello. I want to make one thread that puts values to a queue when it gets empty and wait for this condition while it is not. Here's the code I've tried to use, but it prints

Adding new
Taking Value 1
Taking Value 2
Taking Value 3
Taking Value 4

So it is working only one time. What is the problem?

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class SO {
    public String test;
    public String[] list = new String[] {test};

    public static void main(String[] args) {
        new SO();
    }

    public SO() {
        go();
    }

    BlockingQueue<String> qq = new LinkedBlockingQueue<String>();

    class Producer implements Runnable {
        public void run() {
            try {
                while (true) {
                    synchronized (this) {
                        while (qq.size() > 0)
                            wait();

                        System.out.println("Adding new");
                        qq.put("Value 1");
                        qq.put("Value 2");
                        qq.put("Value 3");
                        qq.put("Value 4");
                    }
                }
            } catch (InterruptedException ex) {}
        }
    }

    class Consumer implements Runnable {
        public void run() {
            try {
                while(true) {
                    System.out.println("Taking " + qq.take()+". "+String.valueOf(qq.size())+" left");
                    Thread.sleep(1000);
                }
            } catch (InterruptedException ex) {}
        }
    }

    public void go() {
        Producer p = new Producer();
        Consumer c = new Consumer();

        new Thread(p).start();
        new Thread(c).start();
    }
}
+3  A: 

wait() is never notified.

Stephen Denne
+1  A: 

The wait() will continue forever because you never call notify().

You could wait on the queue and call notify on that when you want the waiting thread to wakeup. To do this you would change Producer to read:

    synchronized (qq) {
        while (qq.size() > 0)
            qq.wait();

            System.out.println("Adding new");
            qq.put("Value 1");
            qq.put("Value 2");
            qq.put("Value 3");
            qq.put("Value 4");
    }

And change Consumer to read:

    while(true) {
        synchronized (qq) {
            System.out.println("Taking " + qq.take() + ". " + String.valueOf(qq.size()) + " left");
            qq.notify();
        }
        Thread.sleep(1000);
    }

As Steve says in his answer you could also use wait() in the consumer thread so it can wait until there is something in the list rather than sleeping. So your code would become:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class SO {
    public String test;
    public String[] list = new String[] { test };

    public static void main(String[] args) {
        new SO();
    }

    public SO() {
        go();
    }

    BlockingQueue qq = new LinkedBlockingQueue();

    class Producer implements Runnable {
        public void run() {
            try {
                while (true) {
                    synchronized (qq) {
                        if (!qq.isEmpty()) {
                            qq.wait();
                        }

                        System.out.println("Adding new");
                        qq.put("Value 1");
                        qq.put("Value 2");
                        qq.put("Value 3");
                        qq.put("Value 4");
                        qq.notify();
                    }
                }
            } catch (InterruptedException ex) {
            }
        }
    }

    class Consumer implements Runnable {
        public void run() {
            try {
                while (true) {
                    synchronized (qq) {
                        System.out.println("Taking " + qq.take() + ". "
                                + String.valueOf(qq.size()) + " left");
                        if (qq.isEmpty()) {
                            qq.notify();
                            qq.wait();
                        }
                    }
                }
            } catch (InterruptedException ex) {
            }
        }
    }

    public void go() {
        Producer p = new Producer();
        Consumer c = new Consumer();

        new Thread(p).start();
        new Thread(c).start();
    }
}
Richard Miskin
A: 

Sice you are using BlockingQueue, you don't have to use synchronized, because BlockingQueue is sychronized by default. If you want to use synchronization, than you should synchronize trough the same object:

 synchronized(theSameObjectInstance) {
    while (true) {                        
        while (qq.size() > 0)
            theSameObjectInstance.wait();                            

        System.out.println("Adding new");
        qq.put("Value 1");
        ...

        theSameObjectInstance.notifyAll();
    }
 }

and consumer's method should be wrapped in synchronized(theSameObjectInstance) in order to recieve notification, consumer should also "wait" somewhere e.g. when qq is empty.

Steve
A: 

Your requirements state that you intend to put values into the queue when it's empty -- several values, I take it, rather than just one. If your requirements change slightly to say that you put one item in the queue and wait until it's consumed before putting another, then you'd be ripe for using java.util.concurrent.Exchanger.

Its behavior is similar to a BlockingQueue of depth one, but it does a little more: It passes an object both ways, where each participant is both a "producer" and a "consumer". Hence, an Exchanger won't accept a "producer"'s offering of an item until the "consumer" is also ready to offer an item back. It's not "fire and forget" for the "producer"; the production and consumption timing is interlocked. This prevents an actual producer from flooding a consumer's work queue -- again, like BlockingQueue -- but it also stalls the producer until the consumer has completed the last round of work.

In your case, the consumer might not have anything useful to return to the producer. Regardless, you can form a protocol between the participants. When the producer wants the consumer thread to shut down, it can offer a null value. Once the consumer accepts the null value, the producer might do one more round of exchange to finalize the shutdown request and collect any final outcome from the consumer.

seh