views:

329

answers:

2

Hi,

I am using the Executor framework to kick off several threads using a threadpool i.e newFixedThreadPool. I use threadpool.submit(aThread) to submit jobs to be executed by the threadpool and this works fine however I need to determine when all the threads are complete so that I can continue with other processing. I looked at using Future.get() which blocks until the thread is complete the problem here being that it blocks until a result is available. I also looked at using continuously calling the isTerminated() method followed by a sleep after issuing the shutdown to check if all threads are complete but this doesn't seem neat to me. Is there another cleaner way to this? Also if there is an exception raised in any one of the threads I want to be able to terminate all other running threads and also stop any queued up thread in the pool from starting. What is the best mechanism to do this?

Look forward to hearing your replies

TIA

+1  A: 

Use ExecutorService#shutdown() and then ExecutorService#awaitTermination()

For example:

ExecutorService service = Executors.newCachedThreadPool();
service.submit(...);
service.submit(...);
service.shutdown();
service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

// All tasks have now finished

As far as notifying you when a task fails with an exception. You'll have to provide a ThreadFactory to the ExecutorService that sets an "uncaught Exception handler" for each thread it creates. This Exception handler can then terminate the running tasks.

Kevin
+1  A: 

One of the cleaner ways to approach this is by modifying the tasks that are submitted. By registering a callback with each task, it can notify of normal completion or an exception without any polling by the main thread.

You can write a simple wrapper that will do this for any Runnable.

Or, following that example, you can extend the idea to wrap any Callable.

class CallbackTask<T>
  implements Callable<T>
{

  private final Callable<? extends T> task;

  private final Callback<T> callback;

  CallbackTask(Callable<? extends T> task, Callback<T> callback)
  {
    this.task = task;
    this.callback = callback;
  }

  public T call()
    throws Exception
  {
    try {
      T result = task.call();
      callback.complete(result);
      return result;
    }
    catch (Exception ex) {
      callback.failed(ex);
      throw ex;
    }
  }

}
erickson