views:

67

answers:

4

I'm trying to use a ThreadPoolExecutor to schedule tasks, but running into some problems with its policies. Here's its stated behavior:

  1. If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  2. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  3. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

The behavior I want is this:

  1. same as above
  2. If more than corePoolSize but less than maximumPoolSize threads are running, prefers adding a new thread over queuing, and using an idle thread over adding a new thread.
  3. same as above

Basically I don't want any tasks to be rejected; I want them to be queued in an unbounded queue. But I do want to have up to maximumPoolSize threads. If I use an unbounded queue, it never generates threads after it hits coreSize. If I use a bounded queue, it rejects tasks. Is there any way around this?

What I'm thinking about now is running the ThreadPoolExecutor on a SynchronousQueue, but not feeding tasks directly to it - instead feeding them to a separate unbounded LinkedBlockingQueue. Then another thread feeds from the LinkedBlockingQueue into the Executor, and if one gets rejected, it simply tries again until it is not rejected. This seems like a pain and a bit of a hack, though - is there a cleaner way to do this?

A: 

Just set corePoolsize = maximumPoolSize and use an unbounded queue?

In your list of points, 1 excludes 2, since corePoolSize will always be less or equal than maximumPoolSize.

Edit

There is still something incompatible between what you want and what TPE will offer you.

If you have an unbounded queue, maximumPoolSize is ignored so, as you observed, no more than corePoolSize threads will ever be created and used.

So, again, if you take corePoolsize = maximumPoolSize with an unbounded queue, you have what you want, no?

eljenso
Oops, what I wrote wasn't exactly what I wanted. I edited the original.
Joe K
Setting corePoolsize = maximumPoolSize is indeed close, but I'm also using allowCoreThreadTimeOut(false) and prestartAllCoreThreads().
Joe K
+1  A: 

Would you be looking for something more like a cached thread pool?

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()

kyeana
+1  A: 

Your use case doesn't really make sense and I think part of this is due to a misunderstanding of the definition of "running". A ThreadPoolExecutor considers one of its worker threads to be "running" so long as they exist. They don't have to be actively executing a task. So when you describe your requirements, you are basically saying:

Even if I have corePoolSize workers idly waiting for tasks, still create new worker threads until I have maximumPoolSize workers. Then, and only then, should you start giving my core pool workers tasks to execute.

Once phrased in the terminology used by the ThreadPoolExecutor, I hope you can see why your request doesn't make sense. In order to get what you want, the ThreadPoolExecutor would have to use a Semaphore or some other mechanism to determine how many of the core threads are actually executing tasks, which it does not.

I realize that what you would really like is for additional workers to be created if all the core workers are busy. However, creating additional workers in a ThreadPoolExecutor's context was only intended as a way to potentially drain the task queue faster. Which brings people to the other common misunderstanding which is that extra workers beyond the corePoolSize will not automatically terminate. They still poll the task queue for whatever the specified timeout is. If you don't want to use:

Executors.newCachedThreadPool();

Then your best bet is to set:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

Doing this will cause your ThreadPoolExecutor to always create a new worker if it is under the maximum number of workers allowed. Only when the maximum number of workers are "running" will workers waiting idly for tasks given tasks. If a worker waits aReasonableTimeDuration without a task, then it is allowed to terminate. This of course means you won't gain the real benefits of pooling threads. That is to say, many of your threads may die, having only ever executed a single task.

Tim Bender
+1  A: 

Your use case is common, completely legit and unfortunately more difficult than one would expect. For background info you can read this discussion and find a pointer to a solution (also mentioned in the thread) here. Shay's solution works fine.

Generally I'd be a bit wary of unbounded queues; it's usually better to have explicit incoming flow control that degrades gracefully and regulates the ratio of current/remaining work to not overwhelm either producer or consumer.

Holger Hoffstätte