views:

587

answers:

3

So, I'm curious. How do you handle setting maximum execution time for threads? When running in a thread pool?

I have several techniques but, I'm never quite satisfied with them. So, I figure I'd ask the community how they go about it.

+1  A: 

Normally, I just poll regularly a control object from the threaded code. Something like:

interface ThreadControl {
    boolean shouldContinue();
}

class Timer implements ThreadControl {
    public boolean shouldContinue() {
        // returns false if max_time has elapsed
    }
}

class MyTask implements Runnable {
    private tc;
    public MyTask(ThreadControl tc) {
        this.tc = tc;
    }
    public void run() {
        while (true) {
            // do stuff
            if (!tc.shouldContinue())
                break;
        }
    }
}
martin
+2  A: 

How about:

Submit your Callable to the ExecutorService and keep a handle to the returned Future.

ExecutorService executorService = ... // Create ExecutorService.
Callable<Result> callable = new MyCallable(); // Create work to be done.
Future<Result> fut = executorService.submit(callable);

Wrap the Future in an implementation of Delayed whereby Delayed's getDelay(TimeUnit) method returns the maximum execution time for the work in question.

public class DelayedImpl<T> implements Delayed {
  private final long maxExecTimeMillis;
  private final Future<T> future;

  public DelayedImpl(long maxExecTimeMillis, Future<T> future) {
    this.maxExecMillis = maxExecMillis;
    this.future = future;
  }

  public TimeUnit getDelay(TimeUnit timeUnit) {
    return timeUnit.convert(maxExecTimeMillis, TimeUnit.MILLISECONDS);
  }

  public Future<T> getFuture() {
    return future;
  }
}

DelayedImpl impl = new DelayedImpl(3000L, fut); // Max exec. time == 3000ms.

Add the `DelayedImpl` to a `DelayQueue`.

Queue<DelayedImpl> queue = new DelayQueue<DelayImpl>();
queue.add(impl);

Have a thread repeatedly take() from the queue and check whether each DelayedImpl's Future is complete by calling isDone(); If not then cancel the task.

new Thread(new Runnable() {
  public void run() {
    while (!Thread.interrupted) {
      DelayedImpl impl = queue.take(); // Perform blocking take.
      if (!impl.getFuture().isDone()) {
        impl.getFuture().cancel(true);
      }
    }
  }
}).start();

The main advantage to this approach is that you can set a different maximum execution time per task and the delay queue will automatically return the task with the smallest amount of execution time remaining.

Adamski
Future.get(long timeout, TimeUnit unit) that throws TimeoutException surely will do that? I presume in the catch for this exception you call future.cancel(true)...
JeeBee
Future.get(long,TimeUnit) will block for up to a specified time for that *specific* Future to complete. However, the solution I've added allows you to have a single thread checking *all* Callables in progress rather than *blocking* on a specific one.Suppose I submit a Callable that should take 5 minutes; in my "check" thread I call future.get(5L,TimeUnit.MINUTES). Another Callable is then submitted with a maximum execution time of 10 seconds. However, the thread won't look at whether this second Callable has been running for >10 seconds until the previous blocking call has returned.
Adamski
+1  A: 

Adamski:

I believe that your implementation of the Delayed Interface requires some adjustment in order to work properly. The return value of 'getDelay()' should return a negative value if the amount of time elapsed from the instantiation of the object has exceeded the maximum lifetime. To achieve that, you need to store the time when the task was created (and presumably started). Then each time 'getDelay()' is invoked, calculate whether or not the maximum lifetime of the thread has been exceeded. As in:

class DelayedImpl<T> implements Delayed {

    private Future<T> task;
    private final long maxExecTimeMinutes = MAX_THREAD_LIFE_MINUTES;
    private final long startInMillis = System.currentTimeMillis();

    private DelayedImpl(Future<T> task) {
        this.task = task;
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert((startInMillis + maxExecTimeMinutes*60*1000) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    public int compareTo(Delayed o) {
        Long thisDelay = getDelay(TimeUnit.MILLISECONDS);
        Long thatDelay = o.getDelay(TimeUnit.MILLISECONDS);
        return thisDelay.compareTo(thatDelay);
    }

    public Future<T> getTask() {
        return task;
    }
}
Jason Buberel