views:

77

answers:

3

I want to implement something like this.

1.A background process which will be running forever

2.The background process will check the database for any requests in pending state. If any found,will assign a separate thread to process the request.So one thread per request.Max threads at any point of time should be 10. Once the thread has finished execution,the status of the request will be updated to something,say "completed".

My code outline looks something like this.

public class SimpleDaemon {
private static final int MAXTHREADS = 10;

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(MAXTHREADS);
    RequestService requestService = null; //init code omitted
    while(true){
        List<Request> pending = requestService.findPendingRequests();
        List<Future<MyAppResponse>> completed = new ArrayList<Future<MyAppResponse>>(pending.size());
        for (Request req:pending) {
            Callable<MyAppResponse> worker = new MyCallable(req);
            Future<MyAppResponse> submit = executor.submit(worker);
            completed.add(submit);
        }

        // Now retrieve the result
        for (Future<MyAppResponse> future : completed) {
            try {
                requestService.updateStatus(future.getRequestId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(10000); // Sleep sometime
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
 }

Can anyone spend sometime to review this and comment any suggestion/optimization (from multi threading perspective) ? Thanks.

A: 

You need to handle the potential throwing of a RejectedExecutionException by executor.submit() because the thread-pool has a finite number of threads.

You'd probably be better off using an ExecutorCompletionService rather than an ExecutorService because the former can tell you when a task completes.

I strongly recommend reading Brian Goetz's book "Java Concurrency in Practice".

Steve Emmerson
-1: RejectedExecutionException is only thrown when the internal task queue is bounded or the executor has been shutdown. Executors.newFixedThreadPool uses a LinkedList (unbounded).
Tim Bender
I meant LinkedBlockingQueue ... but the point still stands.
Tim Bender
+1  A: 

Using a max threads of ten seems somewhat arbitrary. Is this the maximum available connections to your database?

I'm a little confused as to why you are purposefully introducing latency into your applications. Why aren't pending requests submitted to the Executor immediately?
The task submitted to the Executor could then update the RequestService, or you could have a separate worker Thread belonging to the RequestService which calls poll on a BlockingQueue of Future<MyAppResponse>.

You have no shutdown/termination strategy. Nothing indicates that main is run on a Thread that is set to Daemon. If it is, I think the ExecutorService's worker threads will inherit the daemon status, but then your application could shutdown with live connection to the database, no? Isn't that bad?
If the thread isn't really a Daemon, then you need to handle that InterruptedException and treat it as an indication that you are being asked to exit the application.

Tim Bender
A: 

Your calls to requestService appear to be single threaded resulted in any long running queries preventing completed queries from being completed.

Unless the updateStatus has to be called in a specific order, I suggest you call this as part of your query in MyCallable. This could simplify your code and allow results to be processed as they become available.

Peter Lawrey
Yes. Your second point makes sense to me. I will correct it. BTW, do you care explaining a bit more about your first point.?
Your loop through the Future objects in a fixed order. Imagine the first task is slow and the others complete quickly, you cannot process the results you have until the first one completes. If each thread handles its own results, you don't have to wait and you can be processing the results while other threads are still generating them.
Peter Lawrey