views:

568

answers:

3

I'm looking for a class where I can override a method to do the work, and return the results like an iterator. Something like this:

ParallelWorkIterator<Result> itr = new ParallelWorkIterator<Result>(trials,threads) {

  public Result work() {
    //do work here for a single trial...
    return answer;
  }

};
while (itr.hasNext()) {
  Result result = itr.next();
  //process result...
}

This is mainly going to be used for things like monte carlo simulations, but I don't want to have to deal with setting up thread pools and managing returning threads every time. I rolled my own class that hopefully accomplishes this, but I'm not confident enough in it and thought I'd check if something like this already existed.

Edit: To be clear, I want it to keep running in the background and queuing results after each work method returns until all trials have been completed. So the next method may wait to return until there is a result in the queue.

+8  A: 

Have a look at the ExecutorCompletionService. It does everything you want.

   void solve(Executor e, Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
       //This class will hold and execute your tasks
       CompletionService<Result> ecs
           = new ExecutorCompletionService<Result>(e);
       //Submit (start) all the tasks asynchronously
       for (Callable<Result> s : solvers)
           ecs.submit(s);
       //Retrieve completed task results and use them
       int n = solvers.size();
       for (int i = 0; i < n; ++i) {
           Result r = ecs.take().get();
           if (r != null)
               use(r);
       }
   }

The benefit of using a CompletionService is that it always returns the first completed result. This ensures you're not waiting for tasks to complete and it lets the uncompleted tasks run in the background.

Tim
Thanks, that looks like it does everything I want. I just wish java wasn't so verbose.
job
verbose and unambiguous are one half of a double-edged sword.
Jason S
What I did to handle the verbosity is write a wrapper class that extends ExecutorCompletionService, and keeps a list of submitted Futures. A public boolean hasRemaining(){return 0<futures.size();} enables me to take/poll while(ecs.hasRemaining()), significantly reducing the verbosity once your tasks have been submitted.
Tim
@Tim That's true, I could probably just wrap it up inside the class I've already written.
job
+2  A: 

I would recommend looking at Java Executors.

You submit a number of tasks and get a Future object back for each one. Your work is processed in the background, and you iterate through the Future objects (like you do in the above). Each Future returns a result as it become available (by calling get() - this blocks until the result has been generated in a separate thread)

Brian Agnew
+1  A: 

The closest thing I can think of is to use a CompletionService to accumulate results as they complete.

Simple example:

ExecutorService executor = Executors.newSingleThreadExecutor(); // Create vanilla executor service.
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor); // Completion service wraps executor and is notified of results as they complete.
Callable<Result> callable = new MyCallable();

executor.submit(callable); // Do not store handle to Future here but rather obtain from CompletionService when we *know* the result is complete.

Future<Result> fut = completionService.take(); // Will block until a completed result is available.
Result result = fut.get(); // Will not block as we know this future represents a completed result.

I would not recommend wrapping this behind an Iterator interface as the Future get() method can throw two possible checked exceptions: ExecutionException and InterruptedException, and you would therefore need to catch and either swallow these or rethrow them as RuntimeExceptions, neither of which is a very nice thing to do. In addition, your Iterator's hasNext() or next() methods would potentially need to block if there was a task in progress, which could be considered counterintuitive for clients using an Iterator. Instead I would implement my own more descriptive interface; e.g.

public interface BlockingResultSet {
  /**
   * Returns next result when it is ready, blocking is required.
   * Returns null if no more results are available.
   */  
  Result take() throws InterruptedException, ExecutionException;
}

(Methods called take() typically represent a blocking call in the java.util.concurrent package).

Adamski
Your code lines are rather long...
pjp