views:

304

answers:

4

I like the ExecutorService series of classes/interfaces. I don't have to worry about threads; I take in an ExecutorService instance and use it to schedule tasks, and if I want to use an 8-thread or 16-thread pool, well, great, I don't have to worry about that at all, it just happens depending on how the ExecutorService is setup. Hurray!

But what do I do if some of my tasks need to be executed in serial order? Ideally I would ask the ExecutorService to let me schedule these tasks on a single thread, but there doesn't seem to be any means of doing so.

edit: The tasks are not known ahead of time, they are an unlimited series of tasks that are erratically generated by events of various kinds (think random / unknown arrival process: e.g. clicks of a Geiger counter, or keystroke events).

+6  A: 

You could write an implementation of Runnable that takes some tasks and executes them serially.

Something like:

public class SerialRunner implements Runnable {
    private List<Runnable> tasks;

    public SerialRunner(List<Runnable> tasks) {
        this.tasks = tasks;
    }

    public void run() {
        for (Runnable task: tasks) {
            task.run();
        }
    }
}
danben
This won't work, the tasks are an unlimited series of tasks being enqueued one at a time.
Jason S
In that case, and if their order is not important with respect to the other tasks you are executing, use `Executors.newSingleThreadExecutor()`
danben
+1: this answer's not what i wanted, but it gives me an idea....
Jason S
+2  A: 

I'm using a separate executor created with Executors.newSingleThreadExecutor() for tasks that I want to queue up and only run one at a time. Another approach is to just compose several tasks and submit that one,

executor.submit(new Runnable() {
   public void run() {
        myTask1.call();
        myTask2.call();
        myTask3.call();
    }});

Though you might need to be more elaborate if still want myTask2 to run even if myTask1 throws an Exception.

nos
Using a separate executor would work, but it's a constraint I would like to avoid if possible.
Jason S
+1  A: 

The way I do this is via some homegrown code that streams work onto different threads according what the task says its key is (this can be completely arbitrary or a meaningful value). Instead of offering to a Queue and having some other thread(s) taking work off it (or lodging work with the ExecutorService in your case and having the service maintain a threadpool that takes off the internal work queues), you offer a Pipelineable (aka a task) to the PipelineManager which locates the right queue for the key of that task and sticks the task onto that queue. There is assorted other code that manages the threads taking off the queues to ensure you always have 1 and only 1 thread taking off that queue in order to guarantee that all work offered to it for the same key will be executed serially.

Using this approach you could easily set aside certain keys for n sets of serial work while round robining over the remaining keys for the work that can go in any old order or alternatively you can keep certain pipes (threads) hot by judicious key selection.

This approach is not feasible for the JDK ExecutorService implementation because they're backed by a single BlockingQueue (at least a ThreadPoolExecutor is) and hence there's no way to say "do this work in any old order but this work must be serialised". I am assuming you want that of course in order to maintain throughput otherwise just stick everything onto a singleThreadExecutor as per danben's comment.

(edit)

What you could do instead, to maintain the same abstraction, is create create your own implementation of ExecutorService that delegates to as many instances of ThreadPoolExecutor (or similar) as you need; 1 backed by n threads and 1 or more single threaded instances. Something like the following (which in no way at all is working code but hopefully you get the idea!)

public class PipeliningExecutorService<T extends Pipelineable> implements ExecutorService {
    private Map<Key, ExecutorService> executors;
    private ExecutorService generalPurposeExecutor;

    // ExecutorService methods here, for example
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        Pipelineable pipelineableTask = convertTaskToPipelineable(task);
        Key taskKey = pipelineable.getKey();
        ExecutorService delegatedService = executors.get(taskKey);
        if (delegatedService == null)  delegatedService = generalPurposeExecutor;
        return delegatedService.submit(task);
    }
}

public interface Pipelineable<K,V> {
    K getKey();
    V getValue();
}

It's pretty ugly, for this purpose, that the ExecutorService methods are generic as opposed to the service itself which means you need some standard way to marshal whatever gets passed in into a Pipelineable and a fallback if you can't (e.g. throw it onto the general purpose pool).

Matt
A: 

hmm, I thought of something, not quite sure if this will work, but maybe it will (untested code). This skips over subtleties (exception handling, cancellation, fairness to other tasks of the underlying Executor, etc.) but is maybe useful.

 class SequentialExecutorWrapper implements Runnable
 {
     final private ExecutorService executor;

     // queue of tasks to execute in sequence
     final private Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();

     // semaphore for pop() access to the task list
     final private AtomicBoolean taskInProcess = new AtomicBoolean(false);

     public void submit(Runnable task)
     {
         // add task to the queue, try to run it now
         taskQueue.offer(task);
         if (!tryToRunNow())
         {
             // this object is running tasks on another thread
             // do we need to try again or will the currently-running thread
             // handle it? (depends on ordering between taskQueue.offer()
             // and the tryToRunNow(), not sure if there is a problem)
         }
     }

     public void run()
     {
         tryToRunNow();
     }

     private boolean tryToRunNow()
     {
         if (taskInProcess.compareAndSet(false, true))
         {
             // yay! I own the task queue!
             try {
                 Runnable task = taskQueue.poll();
                 while (task != null)
                 {
                     task.run();
                     task = taskQueue.poll();
                 }
             }
             finally
             {
                 taskInProcess.set(false);
             }
             return true;
         }
         else
         {
             return false;
         }
     }
Jason S