views:

351

answers:

6

I'm using a thread that is continuously reading from a queue.

Something like:

public void run() {
    Object obj;
    while(true) {
        synchronized(objectsQueue) {
            if(objectesQueue.isEmpty()) {
                try {
                    objectesQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                obj = objectesQueue.poll();
            }
        }

        // Do something with the Object obj
    }
}

What is the best way to stop this thread?

I see two options:

1 - Since Thread.stop() is deprecated, I can implement a stopThisThread() method that uses a n atomic check-condition variable.

2 - Send a Death Event object or something like that to the queue. When the thread fetches a death event it exists.

I prefer the 1st way, however, I don't know when to call the stopThisThread() method, as something might be on it's way to the queue and the stop signal can arrive first (not desirable).

Any suggestions?

+1  A: 

Approach 1 is the preferred one.

Simply set a volatile stop field to true and call interrupt() on the running thread. This will force any I/O methods that wait to return with an InterruptedException (and if your library is written correctly this will be handled gracefully).

Joachim Sauer
Could the down-voter please comment on what's wrong with this answer?
Joachim Sauer
I just down voted. My reasoning is that it is unnecessary to use a separate stop field. Concurrency is a difficult topic. While there is nothing technically wrong with your answer, it also doesn't fit a best practice and I think it is in the interest of the community to limit exposure of answers which work but don't fit a best practice.
Tim Bender
+3  A: 

The DeathEvent (or as it is often call, "poison pill") approach works well if you need to complete all of the work on the queue before shutting down. The problem is that this could take a long time.

If you want to stop as soon as possible, I suggest you do this

BlockingQueue<O> queue = ...

...

public void run() {
   try {
       // The following test is necessary to get fast interrupts.  If
       // it is replaced with 'true', the queue will be drained before
       // the interrupt is noticed.  (Thanks Tim)
       while (!Thread.interrupted()) {
           O obj = queue.take();
           doSomething(obj);
       }
   } catch (InterruptedException ex) {
       // We are done.
   }
}

To stop the thread t that instantiated with that run method, simply call t.interrupt();.

If you compare the code above with other answers, you will notice how using a BlockingQueue and Thread.interrupt() simplifies the solution.

I would also claim that an extra stop flag is unnecessary, and in the big picture, potentially harmful. A well-behaved worker thread should respect an interrupt. An unexpected interrupt simply means that the worker is being run in a context that the original programmer did not anticipate. The best thing is if the worker to does what it is told to do ... i.e. it should stop ... whether or not this fits with the original programmer's conception.

Stephen C
@Stephen C: +1... In addition to that in JCIP it is recommended that if you get an unexpected interrupt you should restore the interrupted status using an *if(interrupted){ Thread.currenThread().interrupt() )}* construct in a *finally* block. Hmmmm so after all maybe that the better way is to both quit upon receiving an unexpected interrupt and restoring the interrupted status and then *also* allow for a "clean" stop using some *stopThisThread()* method or?
Webinator
@WizardOfOdds - I don't follow you. 1) There is no point in the `run` method calling `interrupt()` to restore the flag. The thread that called it is about to die. 2) Nothing you just said explains why stop by interrupt is bad or why a "clean" stop mechanism is better. (And a JCIP recommendation is *not* an explanation.)
Stephen C
@Stephen C: JCIP page 142, there's your "explanation". Btw I didn't pretend to have any explanation and ended up my comment with a question mark '?' in case you didn't notice :)
Webinator
@WizardOfOdds - not withstanding what JCIP might say, it clearly does not apply in this case.
Stephen C
@Stephen C: You should check the interrupt flag in the conditional of your while loop. According to the javadoc, a BlockingQueue implementation only has to throw an InterruptedException if it is waiting. If the queue is non-empty, it doesn't have to wait. Strictly adhering to the javadoc, your run method isn't guaranteed to exit quickly... or at all really.
Tim Bender
@Tim - while the javadoc doesn't explicitly state this, I think that a BlockingQueue implementation *has to* throw InterruptedException if the interrupted flag is already set. If it doesn't, there is an unfixable race condition where the flag gets set after the call to `Thread.isInterrupted()` and before the call to `take()`.
Stephen C
@Stephen C, I want to clarify. It is possible that so long as the queue is never empty, InterruptedException will never be thrown. If you look at the PriorityBlockingQueue and SynchronousQueue implementation you will see that both are allowed to return a value from take() before performing an interruptable blocking action.
Tim Bender
@Tim - Good point. I missed that.
Stephen C
A: 

I think your two cases actually exhibit the same potential behavior. For the second case consider Thread A adds the DeathEvent after which Thread B adds a FooEvent. When your job Thread receives the DeathEvent there is still a FooEvent behind it, which is the same scenario you are describing in Option 1, unless you try to clear the queue before returning, but then you are essentially keeping the thread alive, when what you are trying to do is stop it.

I agree with you that the first option is more desirable. A potential solution would depend on how your queue is populated. If it is a part of your work thread class you could have your stopThisThread() method set a flag that would return an appropriate value (or throw Exception) from the enqueuing call i.e.:

MyThread extends Thread{
  boolean running = true;

  public void run(){
    while(running){
      try{
        //process queue...
      }catch(InterruptedExcpetion e){
        ...
      }
    }
  }

  public void stopThisThread(){
    running = false;
    interrupt();
  }

  public boolean enqueue(Object o){
    if(!running){
       return false;
         OR
       throw new ThreadNotRunningException();
    }
    queue.add(o);
    return true;
  }
}

It would then be the responsibility of the object attempting to enqueue the Event to deal with it appropriately, but at the least it will know that the event is not in the queue, and will not be processed.

M. Jessup
You approach is wrong unless you define the running variable as volatile.It is also not recommended to extend Threads but rather implement Callable or Runnable , so you wont have the interrupt function in your scope
Roman
A: 

I usually put a flag in the class that has the Thread in it and in my Thread code I would do. (NOTE: Instead of while(true) I do while(flag))

Then create a method in the class to set the flag to false;

private volatile bool flag = true;

public void stopThread()
{
   flag = false;
}

    public void run() {
        Object obj;
        while(flag) {
            synchronized(objectsQueue) {
                if(objectesQueue.isEmpty()) {
                    try {
                        objectesQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    obj = objectesQueue.poll();
                }
            }

            // Do something with the Object obj
        }
    }
Romain Hippeau
That doesn't look thread safe to me. The `flag` variable is being read and written in different threads without proper synchronization.
Stephen C
@Romain Hippeau: I made your boolean flag *volatile*, otherwise, as Stephen C pointed out, your code ain't properly synchronized.
Webinator
@WizardOfOdds - Thanks for the correction
Romain Hippeau
A: 

In your reader thread have a boolean valible stop. When you wish for this thread to stop set thius to true and interrupt the thread. Within the reader thread when safe (when you don't have an unprocessed object) check the status of the stop variable and return out of the loop if set. as per below.

public class readerThread extends Thread{
    private volitile boolean stop = false;
    public void stopSoon(){
        stop = true;
        this.interrupt();
    }
    public void run() {
        Object obj;
        while(true) {
            if(stop){
                return;
            }
            synchronized(objectsQueue) {
            if(objectesQueue.isEmpty()) {
                try {
                    objectesQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(stop){
                    return;
                }    
                obj = objectesQueue.poll();
                // Do something with the Object obj
            }
        }
    }


}
public class OtherClass{
     ThreadReader reader;
     private void start(){
          reader = ...;
          reader.start();
     }

     private void stop(){
          reader.stopSoon();
          reader.join();     // Wait for thread to stop if nessasery.
     }
}
David Waters
A: 

Why not use a scheduler which you simply can stop when required? The standard scheduler supports repeated scheduling which also waits for the worker thread to finish before rescheduling a new run.

ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleWithFixedDelay(myThread, 1, 10, TimeUnit.SECONDS);

this sample would run your thread with a delay of 10 sec, that means when one run finishes, it restarts it 10 seconds later. And instead of having to reinvent the wheel you get

service.shutdown()

the while(true) is not necessary anymore.

ScheduledExecutorService Javadoc

dube