views:

2649

answers:

5

I need to submit a number of task and then wait for them until all results are available. Each of them adds a String to a Vector (that is synchronized by default). Then I need to start a new task for each result in the Vector but I need to do this only when all the previous tasks have stopped doing their job. I want to use Java Executor, in particular I tried using Executors.newFixedThreadPool(100) in order to use a fixed number of thread (I have a variable number of task that can be 10 or 500) but I'm new with executors and I don't know how to wait for task termination. This is something like a pseudocode of what my program needs to do:

EecutorService e = Executors.newFixedThreadPool(100);
while(true){

/*do something*/

for(...){
<start task>
}

<wait for all task termination>

for each String in result{
<start task>
}

<wait for all task termination>
}

I can't do a e.shutdown because I'm in a while(true) and I need to reuse the executorService... Can you help me? Can you suggest me a guide/book about java executors??

+1  A: 

When you submit to an executor service, you'll get a Future object back.

Store those objects in a collection, and then call get() on each in turn. get() blocks until the underlying job completes, and so the result is that calling get() on each will complete once all underlying jobs have finished.

e.g.

Collection<Future> futures = ...
for (Future f : futures) {
   Object result = f.get();
   // maybe do something with the result. This could be a
   // genericised Future<T>
}
System.out.println("Tasks completed");

Once all these have completed, then begin your second submission. Note that this might not be an optimal use of your thread pool, since it will become dormant, and then you're re-populating it. If possible try and keep it busy doing stuff.

Brian Agnew
What's the difference between e.submit (I think I need to use this following your example) and e.execute??
Raffo
The difference is that with submit you get a Future back and with execute you don't. If you use your own `ThreadFactory` with an `UncaughtExceptionHandler`, then `execute` will cause the handler to receive any uncaught exceptions, whereas `submit` will not - you would only get the exceptions via the `Future`'s `get` method
oxbow_lakes
+7  A: 

The ExecutorService gives you a mechanism to execute multiple tasks simultaneously and get a collection of Future objects back (representing the asynchronous computation of the task).

Collection<Callable<?>> tasks = new LinkedList<Callable<?>>();
//populate tasks
for (Future<?> f : executorService.invokeAll(tasks)) {
    f.get(); //this method blocks until the async computation is finished
}

If you have Runnables instead of Callables, you can easily turn a Runnable into a Callable<Object> using the method:

Callable<?> c = Executors.callable(runnable);
oxbow_lakes
+4  A: 

Can you suggest me a guide/book about java executors??

I can answer this part:

Java Concurrency in Practice by Brian Goetz (with Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes and Doug Lea) is most likely your best bet.

It's not only about executors though, but instead covers java.util.concurrent package in general, as well as basic concurrency concepts and techniques, and some advanced topics such as the Java memory model.

Jonik
+6  A: 

Rather than submitting Runnables or Callables to an Executor directly and storing the corresponding Future return values I'd recommend using a CompletionService implementation to retrieve each Future when it completes. This approach decouples the production of tasks from the consumption of completed tasks, allowing for example new tasks to originate on a producer thread over a period of time.

Collection<Callable<Result>> workItems = ...
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletionService<Result> compService = new ExecutorCompletionService<Result>(executor);

// Add work items to Executor.
for (Callable<Result> workItem : workItems) {
  compService.submit(workItem);
}

// Consume results as they complete (this would typically occur on a different thread).
for (int i=0; i<workItems.size(); ++i) {
  Future<Result> fut = compService.take(); // Will block until a result is available.
  Result result = fut.get(); // Extract result; this will not block.
}
Adamski
In practice this involves more LOC than my example above. A CompletionService is generally useful if you are submitting tasks from many locations but want to handle task completions consistently (for example, to perform some other computation) - which you only want to define once
oxbow_lakes
@oxbow: Or if you want to start processing results as soon as the first task completes! Otherwise you could be waiting for your slowest task while the others are already done.. (Adamski +1)
Tim
@Tim - The OP said quite clearly he wanted to wait until all the tasks are finished, so it makes no difference (other than a few nanoseconds) which task finishes first.
oxbow_lakes
This approach lets me think about how can I improve my code.. maybe I can do things faster doing something similar. This let me thinks of another problem: I'm NOT sure that my task will end, so how can I add a sort o timeout??
Raffo
@Raffale: It's probably worth posting this as a separate question for clarity (unless one exists already that is).
Adamski
You're right, I'm gonna look a little more into this examples and I'll ask a new question if necessary.
Raffo
A: 
ExecutorService executor = ...
//submit tasks
executor.shutdown(); // previously submitted tasks are executed, 
                     // but no new tasks will be accepted
while(!executor.awaitTermination(1, TimeUnit.SECONDS))
    ;

There's no easy way to do what you want without creating custom ExecutorService.

Jarek Przygódzki