views:

51

answers:

2

The problem is: class C controls class B in terms of whether or not to let B add the A to the queueOfA back again, but how can I make sure that when A notifies C and update immediately before B's queueOfA becomes empty, because class B is running so fast that may remove all the A's so becomes an empty queueOfA before C doing the update, which may leads to C's run method go to the end.

Please help me out!

class A extends Observable implements Runnable{
    //...
    void run(){
     //...
     if ( goBackToTheQueueAgain == true ){
         setChanged();
         notifyObservers();//notify C that to let B add itself back
     }
    //...
}

class B extends implements Runnable{
    Queue<A> queueOfA;// initialy, queueOfA contains several A's
    //...
    void run(){
       //...
       A retrieved = queueOFA.remove();
       Thread aThread = new Thread(retrieved);
       aThread.start();
       //...
    }
 }

 class C implements Observer, Runnable {
    B b; //initially class C contains an instance of B 
    //...
    void update(Observable o, Object arg){
       if(o instanceof A){
         A a = (A) o;
         b.queueOfA.add(a);
       }
    }
    //...
    void run(){
       //...
       Thread bThread = new Thread(b);
       bThread.start();
       while ( b.queueOfA.isEmpty() == false ){
          // if queueOfA is not empty loop for ever
       }
      //..
    }
 }
+4  A: 

The problem is not that you have to notify the update as soon as possible - you cannot guarantee that. For example, consider that you have just one element in the queue to start with. That gets removed from the queue and then the queue is empty, yet the element is still being processed. The A thread can take as long as it likes - C should not terminate, even though the queue is empty.

The fix is that C should wait until all A's have been fully processed. That is, the A's reached the end of their run method without putting the A back on the queue. This can be done using a countdown latch. Initially set the latch to the size of the queue (the number of A's) and decrement this latch each time an A is fully processed. C will exit only when this latch becomes zero.

C looks something like this

CountdownLatch latch;

void run(){
   //...
   this.latch = new CountDownLatch(queueOfA.size());
   Thread bThread = new Thread(b);
   bThread.start();
   latch.await();
  //.. catch InterruptedException etc..
}

void notifyDone(A a) {
    this.latch.countDown();
}

And A's run method is

void run(){
     //...
     C c = ...; // passed in from somewhere
     if ( goBackToTheQueueAgain == true ){
         setChanged();
         notifyObservers();//notify C that to let B add itself back
     }
     else
         c.notifyDone(this);
}

A needs a reference to C, so that it can notify directly that it is done.

Personally, I would not use the Observer/Observable here, since your notification has a specific meaning - that the item can be requeued. The observer pattern is best suited to notify in changes of state, where that state is available from the instance being observed. that's not the case here, and there is no real gain from such abstract decoupling. If you want to keep it abstract and not couple A to C, you can introduce an additional interface, e.g.

interface QueueHandler<T>
{
    void requeue(T t);
    void completed(T t);
}

and implement this on C, and pass it to each A instead of the current Observer interface. But equally, the A and C classes are quite tightly coupled, so you could also just pass C into A and let it notify directly.

mdma
the latch thing is good, but the problem remains that B quickly removed all the elements from queueOfA, so B is finished fastly but the latch is never counted down to zero, because A is not notifying C any more! So we really need to let A finishing updating then let B execute the remove then...
baboonWorksFine
The thing is: B is initially set to running a fixed time, during execution if B becomes empty,then B will stop executing immediately, but since there is possibility that A will go back to the queue again, so A has to make sure that C let A go back to the queue before B found himself is empty!Please help me out. Mr.mdma
baboonWorksFine
I don't understand why A does not notify C - it's part of A's run method. You can have B use BlockingQueue.take() which blocks until an element is available - B never exits - the loop is unconditional. When C finds that the latch is 0 (after latch.await()) it can interrupt B so that B breaks out of the loop.
mdma
Do not exit threads when the queue is empty. I state that at the top of my answer. Have B loop indefinitely, and it is interrupted by C. Another alternative is to put a "poison pill" in the queue - a special object that B looks for, when when it finds it, it breaks out of the queue.
mdma
A: 

I answered this question with a working code example - It is very close. All you would need to do is add an Observer to the Thread that has the main.

http://stackoverflow.com/questions/3068147/image-processing-in-a-multhithreaded-mode-using-java/3072120#3072120

Romain Hippeau