views:

493

answers:

3

Hi,

I am using ExecutorService for ease of concurrent multithreaded program. Take following code:


while(xxx)
 ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
 ...  
 Future<..> ... = exService.submit(..);
 ...
}

In my case the problem is that submit() is not blocking if all NUMBER_THREADS are occupied. The consequence is that the Task queue is getting flooded by many tasks. The consequence of this is, that shutting down the execution service with ExecutorService.shutdown() takes ages (ExecutorService.isTerminated() will be false for long time). Reason is that the task queue is still quite full.

For now my workaround is to work with semaphores to disallow to have to many entries inside the task queue of ExecutorService:


...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);

while(xxx)
 ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
 ...
 semaphore.aquire();  
 // internally the task calls a finish callback, which invokes semaphore.release()
 // -> now another task is added to queue
 Future<..> ... = exService.submit(..); 
 ...
}

I am sure there is a better more encapsulated solution?

+3  A: 

You're better off creating the ThreadPoolExecutor yourself (which is what Executors.newXXX() does anyway).

In the constructor, you can pass in a BlockingQueue for the Executor to use as its task queue. If you pass in a size constrained BlockingQueue (like LinkedBlockingQueue), it should achieve the effect you want.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
Kevin
i see. i overlooked a detail at http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29. there it is mentioned, that as standard an unbounded queue is used.
manuel aldana
i tried it out. unfortunately your solution does not block (as I want) but throws a RejectedExecutionException. also found: http://www.velocityreviews.com/forums/t389526-threadpoolexecutor-with-blocking-execute.html. the presented workarounds seem to be more complicated as my semaphore example, damn!
manuel aldana
A: 

You can ThreadPoolExecutor.getQueue().size() to find out the size of the waiting queue. You can take an action if the queue is too long. I suggest running the task in the current thread if the queue is too long to slow down the producer (if that is appropriate)

Peter Lawrey
A: 

A true blocking ThreadPoolExecutor has been on the wishlist of many, there's even a JDC bug opened on it. I'm facing the same problem, and came across this: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

It's an implementation of a BlockingThreadPoolExecutor, implemented using a RejectionPolicy that uses offer to add the task to the queue, waiting for the queue to have room. It looks good.

mdma