views:

496

answers:

2

I'm looking for an ExecutorService implementation that can be provided with a timeout. Tasks that are submitted to the ExecutorService are interrupted if they take longer than the timeout to run. Implementing such a beast isn't such a difficult task, but I'm wondering if anybody knows of an existing implementation.

Here's what I came up with based on some of the discussion below. Any comments?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}
+1  A: 

Wrap the task in FutureTask and you can specify timeout for the FutureTask. Look at the example in my answer to this question,

http://stackoverflow.com/questions/1247390/java-native-process-timeout/1249984#1249984

ZZ Coder
I realize there are a couple ways to do this using the `java.util.concurrent` classes, but I'm looking for an `ExecutorService` implementation.
scompt.com
If you are saying that you want your ExecutorService to hide the fact that timeouts are being added from client code, you could implement your own ExecutorService that wraps every runnable handed to it with a FutureTask before executing them.
erikprice
+3  A: 

You can use a ScheduledExecutorService for this. First you would submit it only once to begin immidiatly and retain the future that is created. After that you can submit a new task that would cancel the retained future after some period of time.

 ScheduledExecutorService exectutor = Executors.newScheduledThreadPool(2); 
 final Future handler = exectutor.submit(new Callable(){... });
 exectutor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

This will execute your handler (main functionality to be interrupted) for 10 seconds, then will cancel (ie interrupt) that specific task.

John V.
Interesting idea, but what if the task finishes before the timeout (which it normally will)? I'd rather not have tons of cleanup tasks waiting to run only to find out their assigned task has already completed. There'd need to be another thread monitoring the Futures as they finish to remove their cleanup tasks.
scompt.com
The executor will only schedule this cancel once. If the task is completed then the cancel is a no op and work continues unchanged. There only needs to be one extra thread scheudling to cancel the tasks and one thread to run them. You could have two executors, one to submit your main tasks and one to cancel them.
John V.
You could hook on `InterruptedException`. Also see [this answer](http://stackoverflow.com/questions/2275443/how-to-timeout-a-thread/2275596#2275596).
BalusC
That's true, but what if the timeout is 5 hours and in that time 10k tasks are executed. I'd like to avoid having all those no-ops lying around taking up memory and causing context switches.
scompt.com
Understandable, however, if you have the task at 5 hours then your thread will park for (5, TimeUnit.HOURS). Since its parked there would be no switching until it is unparked and the interruption will occur then.
John V.
@BalusC: Ideally, the tasks themselves would remain unchanged.
scompt.com
@John W.: Assuming that all tasks complete on time, there would still be 10k unnecessary interruptions, regardless of how long the timeout is.
scompt.com
@Scompt Not necessarily. There would be 10k future.cancel() invocations, however if the future is completed then the cancel will fast path out and not do any uneccesary work. If you do not want 10k extra cancel invocations then this may not work, but the amount of work done when a task is completed is very small.
John V.
These problems could be resolved with a `ExecutorCompletionService`. I'm looking into that now.
scompt.com
@John W.: I just realized another issue with your implementation. I need the timeout to begin when the task starts execution, as I commented earlier. I think the only way to do that is to use the `beforeExecute` hook.
scompt.com
This answer got me thinking about the solution that I ended up with (posted in the question above), so I'm going to accept it. Thanks!
scompt.com
Thank you scompt. I looked at your code and cant find an error with it. The thread-safety aspect of it is well done.
John V.