The way I do this is via some homegrown code that streams work onto different threads according what the task says its key is (this can be completely arbitrary or a meaningful value). Instead of offering to a Queue
and having some other thread(s) taking work off it (or lodging work with the ExecutorService
in your case and having the service maintain a threadpool that takes off the internal work queues), you offer a Pipelineable
(aka a task) to the PipelineManager
which locates the right queue for the key of that task and sticks the task onto that queue. There is assorted other code that manages the threads taking off the queues to ensure you always have 1 and only 1 thread taking off that queue in order to guarantee that all work offered to it for the same key will be executed serially.
Using this approach you could easily set aside certain keys for n sets of serial work while round robining over the remaining keys for the work that can go in any old order or alternatively you can keep certain pipes (threads) hot by judicious key selection.
This approach is not feasible for the JDK ExecutorService
implementation because they're backed by a single BlockingQueue
(at least a ThreadPoolExecutor
is) and hence there's no way to say "do this work in any old order but this work must be serialised". I am assuming you want that of course in order to maintain throughput otherwise just stick everything onto a singleThreadExecutor
as per danben's comment.
(edit)
What you could do instead, to maintain the same abstraction, is create create your own implementation of ExecutorService
that delegates to as many instances of ThreadPoolExecutor
(or similar) as you need; 1 backed by n threads and 1 or more single threaded instances. Something like the following (which in no way at all is working code but hopefully you get the idea!)
public class PipeliningExecutorService<T extends Pipelineable> implements ExecutorService {
private Map<Key, ExecutorService> executors;
private ExecutorService generalPurposeExecutor;
// ExecutorService methods here, for example
@Override
public <T> Future<T> submit(Callable<T> task) {
Pipelineable pipelineableTask = convertTaskToPipelineable(task);
Key taskKey = pipelineable.getKey();
ExecutorService delegatedService = executors.get(taskKey);
if (delegatedService == null) delegatedService = generalPurposeExecutor;
return delegatedService.submit(task);
}
}
public interface Pipelineable<K,V> {
K getKey();
V getValue();
}
It's pretty ugly, for this purpose, that the ExecutorService
methods are generic as opposed to the service itself which means you need some standard way to marshal whatever gets passed in into a Pipelineable and a fallback if you can't (e.g. throw it onto the general purpose pool).