views:

572

answers:

2

I have been working with the example code from the ExecutorCompletionService and put together the following example code. The code in solve() works as expected and prints
1
2
3
4
5
The code in solve2() doesn't print anything and in fact never exits. It doesn't matter whether ecs is constructed before or after submitting the jobs to the ExecutionService.

Is there no way to use the CompletionService construct with FutureTasks? I have rewritten my production code to get() the results of the FutureTask directly, rather than trying to get() them from the ExecutorCompletionService, but it (Currently) has resulted in some messy looking stuff. In short, what's wrong with solve2 below? Thanks.

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class sample {
public static class stringCallable implements Callable<String>{
 String mstring;

 stringCallable(String s) {mstring = s;}
 @Override
 public String call() throws Exception {
  // TODO Auto-generated method stub
  return mstring;
 }
};

public static void main(String[] args) {
 // TODO Auto-generated method stub
 ArrayList<Callable<String>> list = new ArrayList<Callable<String>>();
 ExecutorService es = Executors.newFixedThreadPool(1);
 Executor e = Executors.newSingleThreadExecutor();
 list.add(new stringCallable("1"));
 list.add(new stringCallable("2"));
 list.add(new stringCallable("3"));
 list.add(new stringCallable("4"));
 list.add(new stringCallable("5"));

 try {
  solve(e, list);
 } catch (InterruptedException e1) {
  // TODO Auto-generated catch block
  e1.printStackTrace();
 } catch (ExecutionException e1) {
  // TODO Auto-generated catch block
  e1.printStackTrace();
 }
 System.out.println ("Starting Solver 2");

 try {
  solve2(es, list);
 } catch (InterruptedException e1) {
  // TODO Auto-generated catch block
  e1.printStackTrace();
 } catch (ExecutionException e1) {
  // TODO Auto-generated catch block
  e1.printStackTrace();
 }
}

static void solve(Executor e, Collection<Callable<String>> solvers)throws InterruptedException, ExecutionException {
 CompletionService<String> ecs = new ExecutorCompletionService<String>(e);
 for (Callable<String> s : solvers)
     ecs.submit(s);
 int n = solvers.size();
 for (int i = 0; i < n; ++i) {
  String r = ecs.take().get();
  if (r != null)
   use(r);
 }
}

static void solve2(ExecutorService e, Collection<Callable<String>> solvers)throws InterruptedException, ExecutionException {
    for (Callable<String> s : solvers){
     FutureTask<String> f = new FutureTask<String>(s);
       e.submit(f);
    }
    CompletionService<String> ecs = new ExecutorCompletionService<String>(e);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
     String r = ecs.take().get();
     if (r != null)
      use(r);
    }
}

private static void use(String r) {
 System.out.println (r);
}

}

A: 

This is how I would implement it:

static void solve2(ExecutorService e, Collection<Callable<String>> solvers)throws InterruptedException, ExecutionException {
 CompletionService<String> ecs = new ExecutorCompletionService<String>(e);
 for (Callable<String> s : solvers){
  ecs.submit(s);
 }
 int n = solvers.size();
 for (int i = 0; i < n; ++i) {
  String r = ecs.take().get();
  if (r != null)
   use(r);
 }
}

The ExecutorCompletionService is "just" a wrapper around ExecutorService, but you must submit your callables to the ECS, as the ECS will take the result of the callable, place it onto a queue. This result is then available via a take() or poll(). If you submit a callable directly on the ExecutorService, the ECS cannot know about its completion. If you look in the javadoc of ECS, it says exactly the same things + good example (even better explanation). I advice you also to take a look at the source code java.util.concurrent.ExecutorCompletionService

kth
Yeah the ECS javadocs is where I got the first solution from, but we had a a preconception about the FutureTask thing that was getting in the way.Thanks!
Jim
+1  A: 

In solve2, when you create a ExecutorCompletionService using the existing ExecutorService, it's submitted tasks are ignored by the wrapper because it uses a separate LinkedBlockingQueue. The submitted tasks are not inherited. So, your code blocks when you do ecs.take().get(); because the ExecutorCompletionService doesn't have, itself, any submitted tasks.

Also, you don't need to specifically create FutureTask's to submit to the ExecutorCompletionService. These Future tasks already are created for you, internally. That's why you get a Future<String> when calling ecs.take();.

Given this, your solve2 function is completely useless. You are already doing it correctly in solve1.

bruno conde
Thanks. Solve came directly from java site. A coworker and I couldn't figure out why the second version (which is essentially how we were doing it in our code) wouldn't work. Thanks!
Jim