views:

564

answers:

3

Is there any way to create Executor that will have always at least 5 threads, and maximum of 20 threads, and unbounded queue for tasks (meaning no task is rejected)

I tried new ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) with all possibilities that I thought of for queue:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

and none worked as wanted.

+1  A: 

The javadocs for ThreadPoolExecutor are pretty clear that once corePoolSize threads have been created, new threads will only be created once the queue is full. So if you set core to 5 and max to 20, you'll never get your desired behaviour.

However, if you set both core and max to 20, then tasks will only get added to the queue if all 20 threads are busy. Of course, this renders your "5 threads minimum" requirement a bit meaningless, since all 20 will be kept alive (until they idle out, anyway).

skaffman
"idling out" will never happen, unless you say all core threads can die. In any way, don't see the point of both core and max size if it cannot be used properly. Is there any other class (other than ThreadPoolExecutor), that will meet my requirements?
Sarmun
+3  A: 

Maybe something like this would work for you? I just whipped it up so please poke at it. Basically, it implements an overflow thread pool that is used to feed the underlying ThreadPoolExecutor

There are two major draw backs I see with it:

  • The lack of a returned Future object on submit(). But maybe that is not an issue for you.
  • The secondary queue will only empty into the ThreadPoolExecutor when jobs are submitted. There has got to be an elegant solution, but I don't see it just yet. If you know that there will be a stead stream of tasks into the StusMagicExecutor then this may not be an issue. ("May" being the key word.) An option might to be to have your submitted tasks poke at the StusMagicExecutor after they complete?

Stu's Magic Executor:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}
Stu Thompson
Great, tnx, this is pretty much what I wanted. With wrapping newTasks with runnable that pokes one task after finish, only one thread out of 20 could be idle while we have tasks in secondary queue.
Sarmun
+1  A: 

I think this problem is a shortcoming of the class and very misleading given the constructor parameter combinations. Here's a solution taken from SwingWorker's inner ThreadPoolExecutor that I made into a top level class. It doesn't have a minimum but does at least use an upper bound. The only thing I don't know is what performance hit you get from the locking execute.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
Mauldus