views:

326

answers:

3

I have a process which delegates asynch tasks to a pool of threads. I need to ensure that certain tasks are executed in order. So for example

Tasks arrive in order

Tasks a1, b1, c1, d1 , e1, a2, a3, b2, f1

Tasks can be executed in any order except where there is a natural dependancy, so a1,a2,a3 must be processed in that order by either allocating to the same thread or blocking these until I know the previous a# task was completed.

Currently it doesn't use the Java Concurrency package, but I'm considering changing to take avantage of the thread management.

Does anyone have a similar solution or suggestions of how to achieve this

+2  A: 

When you submit a Runnable or Callable to an ExecutorService you receive a Future in return. Have the threads that depend on a1 be passed a1's Future and call Future.get(). This will block until the thread completes.

So:

ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
  @Override
  public void run() {
    f1.get();
    ... // do stuff
  }
}
exec.submit(a2);

and so on.

cletus
I don't think this will work with a fixed thread pool, as the threads could all block on `f1.get()` at once and be deadlocked.
finnw
Tune the size of the pool as appropriate.
cletus
Or use a cached thread pool.
finnw
Cached thread pool has its own problem. Thread creation can get out of control if you submit too much.
cletus
A: 

Another option is to create your own executor, call it OrderedExecutor, and create an array of encapsulated ThreadPoolExecutor objects, with 1 thread per internal executor. You then supply a mechanism for choosing one of the internal objects, eg, you can do this by providing an interface that the user of your class can implement:

executor = new OrderedExecutor( 10 /* pool size */, new OrderedExecutor.Chooser() {
  public int choose( Runnable runnable ) {
     MyRunnable myRunnable = (MyRunnable)runnable;
     return myRunnable.someId();
  });

executor.execute( new MyRunnable() );

The implementation of OrderedExecutor.execute() will then use the Chooser to get an int, you mod this with the pool size, and that's your index into the internal array. The idea being that "someId()" will return the same value for all the "a's", etc.

Jack
A: 

When I've done this in the past I've usually had the ordering handled by a component which then submits callables/runnables to an Executor.

Something like.

  • Got a list of tasks to run, some with dependencies
  • Create an Executor and wrap with an ExecutorCompletionService
  • Search all tasks, any with no dependencies, schedule them via the completion service
  • Poll the completion service
  • As each task completes
    • Add it to a "completed" list
    • Reevaluate any waiting tasks wrt to the "completed list" to see if they are "dependency complete". If so schedule them
    • Rinse repeat until all tasks are submitted/completed

The completion service is a nice way of being able to get the tasks as they complete rather than trying to poll a bunch of Futures. However you will probably want to keep a Map<Future, TaskIdentifier> which is populated when a task is schedule via the completion service so that when the completion service gives you a completed Future you can figure out which TaskIdentifier it is.

If you ever find yourself in a state where tasks are still waiting to run, but nothing is running and nothing can be scheduled then your have a circular dependency problem.

Mike Q