views:

234

answers:

6

What is the simplest way to to wait for all tasks of ExecutorService to finish? My task is primarily computational, so I just want to run a large number of jobs - one on each core. Right now my setup looks like this:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases)
{   
    es.execute(new ComputeDTask(singleTable));
}
try 
{
    es.wait();
} 
catch (InterruptedException e) 
{
    e.printStackTrace();
}

ComputeDTask implements runnable. This appears to execute the tasks correctly, but the code crashes on wait() with IllegalMonitorStateException. This is odd, because I played around with some toy examples and it appeared to work.

uniquePhrases contains several tens of thousands of elements. Should I be using another method? I am looking for something as simple as possible

+6  A: 

If you want to wait for all tasks to complete, use the shutdown method instead of wait. Then follow it with awaitTermination.

Also, you can use Runtime.availableProcessors to get the number of hardware threads so you can initialize your threadpool properly.

SB
shutdown() stops the ExecutorService from accepting new tasks and closes down idle worker threads. It is not specified to wait for the shutdown to complete and the implementation in ThreadPoolExecutor does not wait.
Alain O'Dea
@Alain - thanks. I should have mentioned awaitTermination. Fixed.
SB
@SB - Good stuff :) I love Stack Overflow.
Alain O'Dea
+3  A: 

If you want to wait for the executor service to finish executing, call shutdown() and then, awaitTermination(units, unitType), e.g. awaitTermination(1, MINUTE). The ExecutorService does not block on it's own monitor, so you can't use wait etc.

mdma
I think it's awaitTermination.
SB
@SB - Thanks - I see my memory is fallible! I've updated the name and added a link to be sure.
mdma
+4  A: 

If waiting for all tasks in the ExecutorService to finish isn't precisely your goal, but rather waiting until a specific batch of tasks has completed, you can use a CompletionService -- specifically, an ExecutorCompletionService.

The idea is to create an ExecutorCompletionService wrapping your Executor, submit some known number of tasks through the CompletionService, then draw that same number of results from the completion queue using either take() (which blocks) or poll() (which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.

Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the CompletionService in order to know how many things to try to draw out. This matters especially with the take() method: call it one time too many and it will block your calling thread until some other thread submits another job to the same CompletionService.

There are some examples showing how to use CompletionService in the book Java Concurrency in Practice.

seh
This is a good counterpoint to my answer -- I'd say the straightforward answer to the question is invokeAll(); but @seh has it right when submitting groups of jobs to the ES and waiting for them to complete... --JA
andersoj
A: 

You could wait jobs to finish on a certain interval:

int maxSecondsPerComputeDTask = 20;
try
{
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS))
    {
        // consider giving up with a 'break' statement under certain conditions
    }
}
catch (InterruptedException e)
{
    throw new RuntimeException(e);    
}

Or you could use ExecutorService.submit(Runnable) and collect the Future objects that it returns and call get() on each in turn to wait for them to finish.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases)
{
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures)
{
   try
   {
       future.get();
   }
   catch (InterruptedException e)
   {
       throw new RuntimeException(e);
   }
   catch (ExecutionException e)
   {
       throw new RuntimeException(e);
   }
}

InterruptedException is extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.

Alain O'Dea
+5  A: 

The simplest approach is to use ExecutorService.invokeAll() which does what you want in a one-liner. In your parlance, you'll need to modify or wrap ComputeDTask to implement Callable<>, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation of Callable.call(), but here's a way to wrap it if not using Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable : uniquePhrases)
{ todo.add(Executors.callable((new ComputeDTask(singleTable)); }

Future<Object> answers = es.invokeAll(todo);

As others have pointed out, you could use the timeout version of invokeAll() if appropriate. In this example, answers is going to contain a bunch of Futures which will return nulls (see definition of Executors.callable(). Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlying ComputeDTask, but I can't tell from your example.

If it isn't clear, note that invokeAll() will not return until all the tasks are completed. (i.e., all the Futures in your answers collection will report .isDone() if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse this ExecutorService neatly for multiple cycles, if desired.

There are a few related questions on SO:

None of these are strictly on-point for your question, but they do provide a bit of color about how folks think Executor/ExecutorService ought to be used.

andersoj
A: 

I also have the situation that I have a set of documents to be crawled. I start with an initial "seed" document which should be processed, that document contains links to other documents which should also be processed, and so on.

In my main program, I just want to write something like the following, where Crawler controls a bunch of threads.

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

The same situation would happen if I wanted to navigate a tree; i would pop in the root node, the processor for each node would add children to the queue as necessary, and a bunch of threads would process all the nodes in the tree, until there were no more.

I couldn't find anything in the JVM which I thought was a bit surprising. So I wrote a class AutoStopThreadPool which one can either use directly or subclass to add methods suitable for the domain, e.g. schedule(Document). Hope it helps!

AutoStopThreadPool Javadoc | Download

Adrian Smith