views:

843

answers:

4

I want to create a ThreadPoolExecutor such that when it has reached its maximum size and the queue is full, the sumbit() method blocks when trying to add new tasks. Do I need to implement a custom RejectedExecutionHandler for that or is there an existing way to do this using standard java library?

A: 

Is what you want anything like the Array blocking queue's offer() method?

extraneon
Yes, something like that.
Shooshpanchick
+2  A: 

You should use the CallerRunsPolicy, which executes the rejected task in the calling thread. This way, it can't submit any new tasks to the executor until that task is done, at which point there will be some free pool threads or the process will repeat.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

From the docs:

Rejected tasks

New tasks submitted in method execute(java.lang.Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:

  1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
  2. In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
  3. In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
  4. In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

Also, make sure to use a bounded queue, such as ArrayBlockingQueue, when calling the ThreadPoolExecutor constructor. Otherwise, nothing will get rejected.

Edit: in response to your comment, set the size of the ArrayBlockingQueue to be equal to the max size of the thread pool and use the AbortPolicy.

Edit 2: Ok, I see what you're getting at. What about this: override the beforeExecute() method to check that getActiveCount() doesn't exceed getMaximumPoolSize(), and if it does, sleep and try again?

danben
I want to have number of concurrently executed tasks to be strictly bounded (by the number of threads in Executor), this is why I can't allow caller threads to execute these tasks themselves.
Shooshpanchick
AbortPolicy would cause caller thread to receive a RejectedExecutionException, while I need it to just block.
Shooshpanchick
I'm asking for blocking, not sleep)
Shooshpanchick
+4  A: 

One of the possible solutions I've just found (here):

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Are there any other solutions? I'd prefer something based on RejectedExecutionHandler since it seems like a standard way to handle such situations.

Shooshpanchick
When you catch RejectedExecutionException, should this not be re-thrown after the semaphore.release(); such that the caller still finds out about the problem?
Timothy Pratley
Yes, I agree. Edited.
Shooshpanchick
A: 

Create your own blocking queue to be used by the Executor, with the blocking behavior you are looking for, while always returning available remaining capacity (ensuring the executor will not try to create more threads than its core pool, or trigger the rejection handler).

I believe this will get you the blocking behavior you are looking for. A rejection handler will never fit the bill, since that indicates the executor can not perform the task. What I could envision there is that you get some form of 'busy waiting' in the handler. That is not what you want, you want a queue for the executor that blocks the caller...

Fried Hoeben
`ThreadPoolExecutor` uses `offer` method to add tasks to queue. If I created a custom `BlockingQueue` that blocks on `offer`, that would break `BlockingQueue`s contract.
Shooshpanchick