views:

1621

answers:

3

I have a method that takes an array of queries, and I need to run them against different search engine Web API's, such as Google's or Yahoo's. To optimize this process, I thought of creating a thread for each query, and then joining them all at the end, since my application can only continue after I have the results of every query. My structure is something like this:

public abstract class class Query extends Thread {
    private String query;

    public abstract Result[] querySearchEngine();
    @Override
    public void run() {
        Result[] results = querySearchEngine(query);
        Querier.addResults(results);
    }

}

public class GoogleQuery extends Query {
    public Result querySearchEngine(String query) { 
        // access google rest API
    }
}

public class Querier {
    /* Every class that implements Query fills this array */
    private static ArrayList<Result> aggregatedResults;

    public static void addResults(Result[]) { // add to aggregatedResults }

    public static Result[] queryAll(Query[] queries) {
        /* for each thread, start it, to aggregate results */
        for (Query query : queries) {
            query.start();
        }
        for (Query query : queries) {
            query.join();
        }
        return aggregatedResults;
    }
}

I don't really like the call to the static method, inside the run() method of the class Query, and I'd like to improve this code.

Recently, I have found that there's a "new" API in Java for doing concurrent jobs. Namely, the Callable interface which I believe is quite similar to Runnable, but can return results; the FutureTask and the ExecutorService. I was wondering if this new API is the one that should be used, and if they are more efficient than the classic ones.

After studying the "new" API, I came up with this new code (simplified version):

   public abstract class Query implements Callable<Result[]> {
        private final String query; // gets set in the constructor

        public abstract Result[] querySearchEngine();
        @Override
        public Result[] call() {
            return querySearchEngine(query);
        }
    }

public class Querier {   
        private ArrayList<Result> aggregatedResults;

        public Result[] queryAll(Query[] queries) {
            List<Future<Result[]>> futures = new ArrayList<Future<Result[]>>(queries.length);
            final ExecutorService service = Executors.newFixedThreadPool(queries.length);  
            for (Query query : queries) {
                futures.add(service.submit(query));  
            }
            for (Future<Result[]> future : futures) {  
                aggregatedResults.add(future.get());  // get() is somewhat similar to join?
            }  
            return aggregatedResults;
        }
    }

This way, I don't need to access static methods, and I think the code ends up getting better. I'm new to this concurrency API, and I'd like to know if there's something that can be improved on the above code, and if it's better than the first option (using the Thread class). There are some classes which I didn't explore, such as FutureTask, et cetera. I'd love to hear any advice.

Cheers.

+3  A: 

As a futher improvement, you could look into using a CompletionService It decouples the order of submitting and retrieving, instead placing all the future results on a queue from which you take results in the order they are completed..

Tim
Since the application can only continue in this case after *every* task is completed, a CompletionService might not be appropriate here.
Avi
@Avi: I disagree, it's just not that nice as future.get().
kd304
@kd304: What method of CompletionService would you use, to get all results of a set of tasks?
Avi
Something like `excCmpSrv.take().get()`, where you have to be carefull not to take() if there aren't any submitted Futures left (it'll wait for a new one that doesn't come).. Using poll or counting the number of submitted Callables is a way of working around this
Tim
+5  A: 

Several problems with your code.

  1. You should probably be using the ExecutorService.invokeAll() method. The cost of creating new threads and a new thread pool can be significant (though maybe not compared to calling external search engines). invokeAll() can manage the threads for you.
  2. You probably don't want to mix arrays and generics.
  3. You are calling aggregatedResults.add() instead of addAll().
  4. You don't need to use member variables when they could be local to the queryAll() function call.

So, something like the following should work:

public abstract class Query implements Callable<List<Result>> {
    private final String query; // gets set in the constructor

    public abstract List<Result> querySearchEngine();
    @Override
    public List<Result> call() {
        return querySearchEngine(query);
    }
}

public class Querier {   
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public List<Result> queryAll(List<Query> queries) {
        List<Future<List<Result>>> futures = executor.submitAll(queries);
        List<Result> aggregatedResults = new ArrayList<Result>();
        for (Future<List<Result>> future : futures) {  
            aggregatedResults.addAll(future.get());  // get() is somewhat similar to join?
        }  
        return aggregatedResults;
    }
}
Avi
Changing to cached thread pool might not be the best option, as your application is IO-bound, as most search engines are really fast and will respond promptly.
kd304
@kd304: Indeed, the search engines that I'm using are quite fast (Google and Yahoo, currently). However, I'm using lots of queries, hence the need for concurrency. What is your advice on this ? From what I've read on the javadoc of the newCachedThreadPool method, it seems to fit my purposes. But then again, I'm quite new to this API.
JG
@Avi: Thank you very much for the suggestions!
JG
@JG: Hard to say, as there is no adaptive pool available in Java, which would adjust its size based on the I/O to CPU ratio. A heuristic approach would be to measure the wait to response, response delivery time and response processing time, then use a fixed pool size to interleave them. On my 100MBit/2 Core computer, the optimum performance is achieved by using size 10 pool for processing.
kd304
+3  A: 

Can I suggest you use Future.get() with a timeout ?

Otherwise it'll only take one search engine being unresponsive to bring everything to a halt (it doesn't even need to be a search engine problem if, say, you have a network issue at your end)

Brian Agnew
Thanks. What is the typical timeout value that is used for this kind of operations?
JG
I think you need to ask yourself how long you'd be prepared to wait :-) Make it configurable and set it to (say) 10x the normal response time.
Brian Agnew
I think that the right layer in the code for the timeout is not Future.get(), it is the network (HTTP?) call to the search engine itself. If the search engine times out, better it should be caught there, and not tie up a thread which is no longer needed.
Avi
That assumes (!) that you're talking HTTP. In the higher, more abstract areas of the code base I wouldn't necessarily make that assumption. However, I think you're right in that setting a timeout on the HTTP operations is always a good idea, and then throwing an appropriate exception. So I would set some timeout in *both* the Future.get() and the HTTP connection. Whether they're the same value is another matter.
Brian Agnew