views:

112

answers:

2

I'm running multiple idempotent tasks to gather one batch of data. I found out that many times the computation is delayed significantly due to a couple of tasks out of a hundred.

What I'd like is a way of watching these tasks and launching the stragglers again if they are significantly delayed.

Is there a standard library or idiom for doing this in Java? I'm currently using the ExecutorService/ExecutorCompletionService pair to get the job done.

+2  A: 

If you have access to the Future object representing this task, then you could check isDone() and cancel() if required. You'd have to poll these future objects and resubmit accordingly. It also depends on your underlying Runnables handling InterruptExceptions appropriately.

Brian Agnew
+1  A: 

You could create a type of task manager that holds a reference to each of the tasks. This task manager can be responsible for launching each task and managing the ExecutorService. The first and last operation of each task is to register with the manager the start and end of task. The manager can then build up a statistical picture that is a average of the time taken to perform the each task.

The task manager periodically scans through its list of running tasks looking for 'outliers' that are still running and have drifted significanty from the average time taken for a particular task. It can then cancel these tasks and restart them.

Below is a very rough outline of what you could do...

public class Task implements Runnable {
     protected TaskManager manager_ = null;
     protected String taskClass_ = null;
     protected String taskId_ = null;

     protected Task(TaskManager manager, String taskClass) {
        manager_ = manager;
        taskClass_ = taskClass;
     }

     /*
      * Override this and perform specific task.
      */
     protected void perform() { }

     public void run() {
      try {
          manager_.taskStarted(this);
          perform();
          manager_.taskCompleted(this);
      catch(InterruptedException) {
          manager_.taskAborted(this);
      }
      finally {
      }
    }
}


public class TaskManager {
    ExecutorService service_ = null;

    public TaskManager() {
       service_ = new ExecutorService();
       // start the monitoring thread.
       service_.execute(this);
    }

    public void runTask(Task t) {
       service_.execute(t);
    }

    public void taskStarted(Task t) {

        1. Note the time that this task (with unique id) has started.
        2. Add time to a hash map.
        3. Add task to list of executing tasks.
    }

    public void taskComplete(Task t) {
        1. Find the task id in hash map
        2. note how long it took to execute.
        3. modify statistics of how long the task took against
           the task Class Id.
        4. Remove task from list of executing tasks.
    }

    public void taskAborted(Task t) {
      // just remove the task from list of running tasks 
      // without altering the statistics.
    }
    public void run() {
         1. Go though the list of executing tasks looking for 
            tasks whose current time  - start time is outside the
            time statistics for the task class id.
         2. cancel the task and start again.
    }
}
Adrian Regan