views:

89

answers:

4

Hi, I am stuck with this following problem. Say, I have a request which has 1000 items, and I would like to utilize Java Executor to resolve this.

Here is the main method

public static void main(String[] args) {

     //Assume that I have request object that contain arrayList of names
     //and VectorList is container for each request result

     ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
     Vector<Result> vectorList = new Vector<Result();

     for (int i=0;i<request.size();i++) {
          threadExecutor.execute(new QueryTask(request.get(i).getNames, vectorList)
     }

      threadExecutor.shutdown();

      response.setResult(vectorList)

}

And here is the QueryTask class

public QueryTask() implements Runnable {

    private String names;
    private Vector<Result> vectorList;

    public QueryTask(String names, Vector<Result> vectorList) {
          this.names = names;
          this.vectorList = vectorList;
    }

    public void run() {
         // do something with names, for example, query database
         Result result = process names;

         //add result to vectorList
         vectorList.add(result);
    }


}

So, based on the example above, I want to make thread pool for each data I have in the request, run it simultaneously, and add result to VectorList. And at the end of the process, I want to have all the result already in the Vector list.

I keep getting inconsistent result in the response. For example, if I pass request with 10 names, I am getting back only 3 or 4, or sometimes nothing in the response. I was expecting if I pass 10, then I will get 10 back.

Does anyone know whats causing the problem?

Any help will be appreciate it.

Thanks

A: 

There are at least 2 issues here.

  1. In your main, you shut down the ExecutorService, then try to get the results out right away. The executor service will execute your jobs asychronously, so there is a very good chance that all of your jobs are not done yet. When you call response.setResult(vectorList), vectorList is not fully populated.

2. You are concurrently accessing the same Vector object from within all of your runnables. This is likely to cause ConcurrentModificationExceptions, or just clobber stuff in the vector. You need to either manually synchronize on the vector inside of QueryTask, or pass in a thread-safe container instead, like Collections.synchronizedList( new ArrayList() );

wolfcastle
point 2 should not happen as vectors are thread safe
Steven
@Steven - thanks. I was thinking of Lists in general, I forget about Vector (I personally never use the class). How it is being used here is okay, but if the QueryTask was doing something more complex than just adding to it (ie, iterating over it) you would still need some additional concurrency control here, even though Vector is synchronized.
wolfcastle
+1  A: 

You need to replace threadExecutor.shutdown(); with threadExecutor.awaitTermination();. After calling threadExecutor.shutdown(), you need to also call threadExecutor.awaitTermination(). The former is a nonblocking call that merely initiates a shutdown whereas the latter is a blocking call that actually waits for all tasks to finish. Since you are using the former, you are probably returning before all tasks have finished, which is why you don't always get back all of your results. The Java API isn't too clear, so someone filed a bug about this.

Yevgeniy Brikman
Wrong, the OP needs both calls.
Tim Bender
Looks like you are right: both shutdown() and awaitTermination() are necessary. The JavaDoc page (http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html) does not make this too obvious and the methods are pretty poorly named, but the code examples at the top include a shutdownAndAwaitTermination() method that helps clear things up.
Yevgeniy Brikman
@Yevgeniy Brikman, yup, if you edit your answer I'll swap my vote :)
Tim Bender
@Tim Edited the original post.
Yevgeniy Brikman
A: 

In addition to the answers already given, a little Agile refactoring will bring your code fully into Java 5 and make it more robust, reusable, and easier to use.

If you are going to use Executors, you should go all the way and use Futures as well. Change Runnable to Callable, and when you queue up your tasks, store the Future objects in your Vector (this should be a LinkedList, actually). The tasks should not have visibility into the storage mechanism for their results: this is too tightly coupled.

I also recommend not putting type names in your variable names (e.g. vectorList), and not using implementation names to store collections (e.g. Vector -- use the the List interface).

This code may be slightly more verbose, but is more loosely coupled and reusable. With a little more refactoring, it could be used for arbitrary tasks.

public QueryTask() implements Callable<Result> {

  private String names;

  public QueryTask(String names) {
    this.names = names;
  }

  public Result call() {
    // do something with names, for example, query database
    Result result = process names;

    //add result to vectorList
    vectorList.add(result);
  }
}

public static void main(String[] args) {
  ExecutorService exec = Executors.newFixedThreadPool(3);
  Collection<Future<Result>> results = new LinkedList<Result>();

  for (Object data : request) {
    results.add(threadExecutor.execute(new QueryTask(data)));
  }

  threadExecutor.awaitTermination();

  // Normally you would see if the Future is done, but
  // the previous statement did that implicitly.
  Collection<Result> finalResults = new LinkedList<Result>();
  for (Future<Result> result : results) {
    finalResults.add(result.get());
  }

  response.setResult(finalResults);
}
John Gaughan
This is terrible and wrong. You still have the reference to vectorList.add(result) which won't compile. You don't call threadExecutor.shutdown(), which will cause the main method to block forever. Aside from that, there is no need to awaitTermination at all since you have a list of futures to process.
Tim Bender
wow -1 for applying real world optimizations that I use at my job to make software suck less... must be some ivory tower wannabe intellectuals trolling me.
John Gaughan
Actually @John Gaughan, the -1 is because this code doesn't function at all as mentioned in my comment. If you actually read the comment you will realize that there are several mistakes you made. In particular, calling awaitTermination(long, TimeUnit) without calling shutdown(). And, more generally speaking, awaitTermination is an erroneous call that only serves to slow down this code since it suffices to wait for the Future.get() to return.
Tim Bender
First of all, the code is not complete, nor is the original -- I copied the original and made a couple changes, I never claimed it was more. Second, it is cleaner to stop the executor and then loop the futures because there are multiples. You can't call get() on a collection of futures and expect it to work.
John Gaughan
@John Gaughan, First where do you stop the executor? hmm? You don't. Second, what are you talking about with the Futures? A `Future` is agnostic of the state of the `ExecutorService` which runs the associated `Callable` and populates it. Why don't you just accept that your answer, while attempting to make some valid points, is wrong and misleading and fix it! Your answer is like textbook examples that don't compile. To an experienced programmer the errors are trivially resolvable, but to a young student they are just another frustration.
Tim Bender
+1  A: 

The easy solution is to add a call to ExecutorService.awaitTermination()

public static void main(String[] args) {

     //Assume that I have request object that contain arrayList of names
     //and VectorList is container for each request result

     ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
     Vector<Result> vectorList = new Vector<Result();

     for (int i=0;i<request.size();i++) {
          threadExecutor.execute(new QueryTask(request.get(i).getNames, vectorList)
     }

      threadExecutor.shutdown();
      threadExecutor.awaitTermination(aReallyLongTime,TimeUnit.SECONDS);

      response.setResult(vectorList)

}
Tim Bender
Actually that was the culprit. I added awaitTermination per Tim suggestion, and it does work now. Thank you!
javaWorker
The whole process itself already fast. Say about 1 second for 1000 items, but I was just being greedy and try to make it even faster. When I implement this multithreading approach, it seems result is not much different from single thread since there is awaitTermination call. Is there any tips of using this awaitTermination to make my process faster?
javaWorker
It may be slighty faster to use ExecutorService.submit(Callable) and iterate over the returned Future objects as @John Guaghan suggests, but do NOT call awaitTermination, it is unnecessary. You might also want to read about Map-Reduce which is essentially what you are doing here.
Tim Bender