views:

87

answers:

3

Hi everybody !

i have this scenario:

class MyClass {
   Producer p;
   Consumer c;
    public static void main(String[] args){
        BlockingQueue q = new LinkedBlockingQueue();
        p = new Producer(q);
        c = new Consumer(q);
        Thread t = new Thread(p);
        t.start();
        new Thread(c).start();
        while (true) {
            if (!p.getContinuer()) {
                c.setContinuer(false);
                System.out.println("here:"+p.getContinuer().toString());
                break;
            }
        }
        System.out.println("finish all");
     }
}

class Producer implements Runnable {
private final BlockingQueue queue;
private AtomicBoolean continuer = new AtomicBoolean(true);
   public Boolean getContinuer() {
      return continuer.get();
   }
    @Override
    public void run() {
      while(true){
        //open socket
        //read data from socket
        queue.put(data);
        if(end){
            System.out.println("Shutting down Producer");
            continuer.getAndSet(false);
        }
      }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;
    private static AtomicBoolean continuer = new AtomicBoolean(true);

    public void setContinuer(Boolean continuerr) {
        continuer = new AtomicBoolean(continuerr);
    }

    public Boolean getContinuer() {
        return continuer.get();
    }

    @Override
    public void run() {
          while (getContinuer()) {
             //Do some work
             consume(queue.take());
          }
          System.out.println("shut down Consumer");
    }
}

this is what i'm getting:

Shutting down Producer

here:false

finish all

that means that the consumer still working, and the variable "continuer" isin't updated.

i saw also this and this posts, i tried it, but nothing changed.

what's my problem ?

EDIT: i changed my code, and apparently the consumer is blocked (waiting if no elements are present on this queue. ) when trying to read data from the BlockingQueue (see consume(queue.take()); in the consumer class).

+3  A: 

Well,

public void setContinuer(Boolean continuerr) {
    continuer = new AtomicBoolean(continuerr);
}

Looks wrong. Why don't you

public void setContinuer(boolean continuerr) {
    continuer.set(continuerr);
}

Furthermore, if the end variable is not volatile, it may be cached in threads.


We will need to see more code. (Or at least compiling code.) Because this code works as expected:

import java.util.concurrent.*;

public class MyClass {
    static Producer p;
    static Consumer c;

    public static void main(String[] args) {
        BlockingQueue q = new LinkedBlockingQueue();
        p = new Producer();
        c = new Consumer();
        Thread t = new Thread(p);
        t.start();
        new Thread(c).start();
        while (true) {
            if (!p.getContinuer()) {
                c.setContinuer(false);
                break;
            }
        }
        System.out.println("finish all");
    }
}

class Producer implements Runnable {
    private boolean end = true;
    private BlockingQueue queue;
    private AtomicBoolean continuer = new AtomicBoolean(true);

    public boolean getContinuer() {
        return continuer.get();
    }

    @Override
    public void run() {
        while (true) {
            // open socket
            // read data from socket
            if (end) {
                System.out.println("Shutting down Producer");
                continuer.getAndSet(false);
                break;
            }
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue queue;
    private static AtomicBoolean continuer = new AtomicBoolean(true);

    public void setContinuer(Boolean continuerr) {
        continuer = new AtomicBoolean(continuerr);
    }

    public Boolean getContinuer() {
        return continuer.get();
    }

    @Override
    public void run() {
        while (getContinuer()) {
            // Do some work
        }
        System.out.println("shut down Consumer");
    }
}

It prints

Shutting down Producer
finish all
shut down Consumer

Regarding your edit:

i changed my code, and apparently the consumer is blocked (waiting) when trying to read data from the BlockingQueue (see consume(queue.take()); in the consumer class).

You should, after your c.setContinuer(false); do consumerThread.interrupt(). The consumer thread that's blocked in the read method, will throw an InterruptedException, (which you may ignore), and then exit the loop and terminate gracefully.

aioobe
thank you for the answer, but this didin't work. And no, end is not cached, because you can see that the thread finishes its work in MyClass and Producer.
mohamida
sorry, but i found the problem (i changed the post).
mohamida
Updated my answer.
aioobe
A: 

Like aioobe said, you're creating a new boolean object every time you set it. That's bad.

Worse still, you have discrete continuer variables for the Producer and the Consumer. So if you set the Producer's boolean to false, why would the Consumer be affected? Did you mean to say Consumer.getAndSet(false)? The code seems a bit convoluted.

EboMike
Have a look at his while-loop in the main-thread.
aioobe
I have this line "c.setContinuer(false);" in MyClass
mohamida
Oh goodness ...
EboMike
A: 

Have you tested whether

if (!p.getContinuer())

is taken? getContinuer for producer is not defined in your example. I would recoment to insert a debugging statement after

c.setContinuer(false);

to get the value of c.getContinuer()

Edit: You should use one Atomicboolean for producer and consumer as the other answers are suggesting.

kasten
sorry, i forgot it (i changed the code). c.getContinuer() is containig false (i changed my prints also). And as you can see in my code, i'm already using AtomicBoolean.
mohamida