views:

542

answers:

3

I'm wrestling with the best way to implement my processing pipeline.

My producers feed work to a BlockingQueue. On the consumer side, I poll the queue, wrap what I get in a Runnable task, and submit it to an ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

This is not ideal; the ExecutorService has its own queue, of course, so what's really happening is that I'm always fully draining my work queue and filling the task queue, which slowly empties as the tasks complete.

I realize that I could queue tasks at the producer end, but I'd really rather not do that - I like the indirection/isolation of my work queue being dumb strings; it really isn't any business of the producer what's going to happen to them. Forcing the producer to queue a Runnable or Callable breaks an abstraction, IMHO.

But I do want the shared work queue to represent the current processing state. I want to be able to block the producers if the consumers aren't keeping up.

I'd love to use Executors, but I feel like I'm fighting their design. Can I partially drink the Kool-ade, or do I have to gulp it? Am I being wrong-headed in resisting queueing tasks? (I suspect I could set up ThreadPoolExecutor to use a 1-task queue and override it's execute method to block rather than reject-on-queue-full, but that feels gross.)

Suggestions?

+4  A: 

I want the shared work queue to represent the current processing state.

Try using a shared BlockingQueue and have a pool of Worker threads taking work items off of the Queue.

I want to be able to block the producers if the consumers aren't keeping up.

Both ArrayBlockingQueue and LinkedBlockingQueue support bounded queues such that they will block on put when full. Using the blocking put() methods ensures that producers are blocked if the queue is full.

Here is a rough start. You can tune the number of workers and queue size:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
Kevin
Thanks; my previous implementation was a lot like this, although it just used a ThreadFactory - once you reduce it to a fixed set of threads that all try to drain the work queue, there's little point in using ExecutorService any more. I was switching to ExecutorService in order to take advantage of a more tunable thread pool, with the semantics of "find an available existing worker thread if one exists, create one if necessary, kill them if they go idle."
Jolly Roger
The Executors.newCachedThreadPool() will do something similar to that. You can also really tweak the pool policy on ThreadPoolExecutor itself. What is it that you're after?
Kevin
That's the idea... it can be tuned exactly the way I like, if I were willing to use its task work queue. What I really want is to carve out the thread pool smarts from the executor and implement my own thread pool client, but it isn't really set up for that.
Jolly Roger
Why aren't you willing to use its task work queue?
Kevin
A: 

You could have your consumer execute Runnable::run directly instead of starting a new thread up. Combine this with a blocking queue with a maximum size and I think that you will get what you want. Your consumer becomes a worker that is executing tasks inline based on the work items on the queue. They will only dequeue items as fast as they process them so your producer when your consumers stop consuming.

D.Shawley
That will only give one worker, and I'd like to tune it to maximize processing for a given system configuration. Processing jobs will last between a fraction of a second and tens of seconds, and be a mix of I/O-bound and CPU-bound work.
Jolly Roger
I was assuming multiple consumers associated with one or more producers. If you only have a single consumer, then why not have the producer dump the work directly into the executor?
D.Shawley
A: 

"find an available existing worker thread if one exists, create one if necessary, kill them if they go idle."

Managing all those worker states is as unnecessary as it is perilous. I would create one monitor thread that constantly runs in the background, who's only task is to fill up the queue and spawn consumers... why not make the worker threads daemons so they die as soon as they complete? If you attach them all to one ThreadGroup you can dynamically re-size the pool... for example:

  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**
Melissa