views:

464

answers:

2

I've extended FutureTask from java.util.concurrent to provide callbacks to track the execution of tasks submitted to an ExecutorService.

public class StatusTask<V> extends FutureTask<V> {

    private final ITaskStatusHandler<V> statusHandler;

    public StatusTask(Callable<V> callable, ITaskStatusHandler<V> statusHandler){
        super(callable);
        if (statusHandler == null)
            throw new NullPointerException("statusHandler cannot be null");
        this.statusHandler = statusHandler;
        statusHandler.TaskCreated(this);
    }

    @Override
    public void run() {
        statusHandler.TaskRunning(this);
        super.run();
    }

    @Override
    protected void done() {
        super.done();
        statusHandler.TaskCompleted(this);
    }

}

Now, what I see is if the task is submitted, but ends up queued and i cancel(true); the task - the run() method still gets called - and the FutureTask.run() (likely) checks that the task is cancelled and doesn't call the wrapped callable.

Should I do e.g.

@Override
public void run() {
  if(!isCancelled()) {  
    statusHandler.TaskRunning(this);
    super.run();
  }
}

Or should I still call super.run() ? Both these approaches seems susceptible to race conditions in between checking for cancellation and doing something about it.. any thoughts appreciated.

+2  A: 

Your problem is that your future tasks are still executed after you have called cancel on them, right?

After a task was submitted to the executor service it should be managed by the executor. (You can still cancel a single task if you like.) You should cancel the execution with the executor shutdownNow method. (This will call the cancel method of all submitted tasks.) An shutdown would still execute all submitted tasks.

The executor does otherwise not "know" that a task was cancelled. It will call the method independently of the internal state of the future task.

The easiest approach would be to use the Executor framework as it is and write an Callable decorator.

class CallableDecorator{

  CallableDecorator(Decorated decorated){
     ...
  }

  setTask(FutureTask task){
    statusHandler.taskCreated(task);
  }

  void call(){
     try{
       statusHandler.taskRunning(task);
       decorated.call();
     }finally{
       statusHandler.taskCompleted(task);
     }
  }
}

The only problem is that task cannot be in the constructor of the decorator. (It's a parameter of the future task constructor.) To break this cycle you have to use a setter or some proxy work around with constructor injection. Maybe this is not needed for the callback at all and you can say: statusHandler.callableStarted(decorated).

Depending on your requirements you may have to signal exceptions and interrupts.

Basic implementation:

class CallableDecorator<T> implements Callable<T> {

    private final Callable<T> decorated;
    CallableDecorator(Callable<T> decorated){
     this.decorated = decorated;
    }

    @Override public T call() throws Exception {
     out.println("before " + currentThread());
     try {
      return decorated.call();
     }catch(InterruptedException e){
      out.println("interupted " + currentThread());
      throw e;
     }
     finally {
      out.println("after " + currentThread());
     }
    }
}

ExecutorService executor = newFixedThreadPool(1);
Future<Long> normal = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() {
      @Override
      public Long call() throws Exception {
       return System.currentTimeMillis();
      }
     }));
out.println(normal.get());

Future<Long> blocking = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() {
      @Override
      public Long call() throws Exception {
       sleep(MINUTES.toMillis(2)); // blocking call
       return null;
      }
     }));

sleep(SECONDS.toMillis(1));
blocking.cancel(true); // or executor.shutdownNow();

Output:

before Thread[pool-1-thread-1,5,main]
after Thread[pool-1-thread-1,5,main]
1259347519104
before Thread[pool-1-thread-1,5,main]
interupted Thread[pool-1-thread-1,5,main]
after Thread[pool-1-thread-1,5,main]
Thomas Jung
seriously, you want to control an individual callable by killing the whole executor? that would work great if you had one executor per callable. in which case, why even use executors? usually you want to selectively cancel jobs, hence the cancel method on Future.
james
No. The sample code shows how to kill the executor or a single task.
Thomas Jung
+2  A: 

You're right that there's a race there. FutureTask#done() will be called at most once, so if the task has already been canceled before it was ever run via RunnableFuture#run(), you'll have missed the call to FutureTask#done().

Have you considered a simpler approach that always issues a symmetric set of paired calls to ITaskStatusHandler#taskRunning() and ITaskStatusHandler#taskCompleted(), like so?

@Override
public void run() {
  statusHandler.TaskRunning(this);
  try {
    super.run();
  finally {
    statusHandler.TaskCompleted(this);
  }
}

Once RunnableFuture#run() is called, it's true that your task in running, or at least trying to be run. Once FutureTask#run() is done, your task is no longer running. It so happens that in the case of cancellation, the transition is (nearly) immediate.

Trying to avoid calling ITaskStatusHandler#taskRunning() if the inner Callable or Runnable is never invoked by FutureTask#run() will require you to establish some shared structure between the Callable or Runnable and the FutureTask-derived type itself, so that when your inner function is first called you set some flag that the outer FutureTask-derived type can observe as a latch, indicating that yes, the function did start running before having been canceled. However, by then, you had to have committed to calling ITaskStatusHandler#taskRunning(), so the distinction isn't so useful.

I've been struggling with a similar design problem lately, and wound up settling on the symmetric before and after operations in my overridden FutureTask#run() method.

seh
I assume the call in the finally clause should be statusHandler.TaskCompleted(this); Is there any case where the run method would not be called but the done() method would ?
nos
Thanks for catching the mistake. I fixed the call in the finally block.Yes, it's possible that done() can be called without run() having been called. See FutureTask#Sync#innerCancel(boolean). There, provided that the task has not finished running, which includes it never having started running, you can see that done() will be called. Note that done() will be called zero or one times for any FutureTask instance: zero if neither run() nor cancel() is never called, once otherwise.
seh
Seems though, if you submit it to an executor done() will be called if the task is cancelled before it is executed - although the executor doesn't know about that. The executor only knows about runnables/callables. So, run() will be called when it eventually becomes runnable in that case FutureTask#run() essentially does nothing though.
nos