I want to create a ThreadPoolExecutor
such that when it has reached its maximum size and the queue is full, the sumbit()
method blocks when trying to add new tasks. Do I need to implement a custom RejectedExecutionHandler
for that or is there an existing way to do this using standard java library?
views:
843answers:
4Is what you want anything like the Array blocking queue's offer() method?
You should use the CallerRunsPolicy
, which executes the rejected task in the calling thread. This way, it can't submit any new tasks to the executor until that task is done, at which point there will be some free pool threads or the process will repeat.
From the docs:
Rejected tasks
New tasks submitted in method execute(java.lang.Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:
- In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
- In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
- In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
- In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)
Also, make sure to use a bounded queue, such as ArrayBlockingQueue, when calling the ThreadPoolExecutor
constructor. Otherwise, nothing will get rejected.
Edit: in response to your comment, set the size of the ArrayBlockingQueue to be equal to the max size of the thread pool and use the AbortPolicy.
Edit 2: Ok, I see what you're getting at. What about this: override the beforeExecute()
method to check that getActiveCount()
doesn't exceed getMaximumPoolSize()
, and if it does, sleep and try again?
One of the possible solutions I've just found (here):
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
Are there any other solutions? I'd prefer something based on RejectedExecutionHandler
since it seems like a standard way to handle such situations.
Create your own blocking queue to be used by the Executor, with the blocking behavior you are looking for, while always returning available remaining capacity (ensuring the executor will not try to create more threads than its core pool, or trigger the rejection handler).
I believe this will get you the blocking behavior you are looking for. A rejection handler will never fit the bill, since that indicates the executor can not perform the task. What I could envision there is that you get some form of 'busy waiting' in the handler. That is not what you want, you want a queue for the executor that blocks the caller...