views:

477

answers:

4

I have submitted a task using executors and I need it to stop after some time (e.g. 5 minutes). I have tried doing like this:

   for (Future<?> fut : e.invokeAll(tasks, 300, TimeUnit.SECONDS)) {
         try {
             fut.get(); 
         } catch (CancellationException ex) {
             fut.cancel(true);   
             tasks.clear();
         } catch(ExecutionException ex){
             ex.printStackTrace(); //FIXME: gestita con printstack       
         }
   }

But I always get an error: I have a shared Vector that needs to be modified by the tasks and then read by a thread, and even if I stop all the task, if the timeout occurs I get:

Exception in thread "Thread-1" java.util.ConcurrentModificationException

Is there something wrong? How can I stop the tasks submitted that are still working after 5 minutes?

A: 

The most common case for ConcurrentModificationException is when the vector is being modified at the same time as it is being iterated. Often this will be done in a single thread. You need to hold a lock on the Vector for the whole iteration (and careful not to deadlock).

Tom Hawtin - tackline
Yes I know why the Exception is being thrown, but it shouldn't. The iteration is after the part of code I posted and so, if the code works well, I shouldn't get an exception...
Raffo
+1  A: 

The ConcurrentModificationException is coming from your call to tasks.clear() while your Exceutors is iterating over your tasks Vector. What you can try to do is call shutdownNow() on your ExecutorService

akf
This doesn't seem to work...
Raffo
+3  A: 

Just because you call cancel() on Future doesn't mean that the task will stop automatically. You have to do some work within the task to make sure that it will stop:

  • Use cancel(true) so that an interrupt is sent to the task
  • Handle InterruptedException. If a function in your task throws an InterruptedException, make sure you exit gracefully as soon as possible upon catching the exception.
  • Periodically check Thread.interrupted() if the task does continuous computation.

For example:

class LongTask implements Callable<Double> {
    public Double call() {

         // Sleep for a while; handle InterruptedException appropriately
         try {
             Thread.sleep(10000);
         } catch (InterruptedException ex) {
             System.out.println("Exiting gracefully!");
             return null;
         }


         // Compute for a while; check Thread.interrupted() periodically
         double sum = 0.0;
         for (long i = 0; i < 10000000; i++) {
             sum += 10.0
             if (Thread.interrupted()) {
                 System.out.println("Exiting gracefully");
                 return null;
             }
         }

         return sum;
    } 
}

Also, as other posts have mentioned: ConcurrentModificationException can be thrown even if using the thread-safe Vector class, because iterators you obtain from Vector are not thread-safe, and thus need to be synchronized. The advanced for-loop uses iterators, so watch out:

Vector<Double> vector = new Vector<Double>();
vector.add(1.0);
vector.add(2.0);

// Not thread safe!  If another thread modifies "vector" during the loop, then
// a ConcurrentModificationException will be thrown.
for (Double num : vector) {
    System.out.println(num);
}

// You can try this as a quick fix, but it might not be what you want:
synchronized (vector) {    // "vector" must be final
    for (Double num : vector) {
        System.out.println(num);
    }
}
Matt Fichman
+1 good answer...
skaffman
Excellent - somehow I never came across Thread.interrupted() - I can use that tomorrow!
Kevin Day
First, calling future.cancel(true) does absolutely nothing. The contract of invokeAll states that it will cancel the tasks before returning, and the implementation uses a final block to ensure it.Second, do not ever call Thread.interrupted(), doing so clears the interrupted state of the thread. Most implementations would want to use Thread.isInterrupted(). Clearing the flag should be scrutinized.Third, he doesn't have to handle the InterruptedException unless he is using blocking methods such as lock acquisition, and then the compiler ensures that he is. The FutureTask will catch exceptions.
Tim Bender
@Tim Bender: You're right: future.cancel(true) does nothing, tested by myself. But I haven't understood what you think I should do..
Raffo
@Tim Bender: You are absolutely right about Thread.interrupted() vs. Thread.isInterrupted(). The actual syntax would be Thread.currentThread().isInterrupted().
Matt Fichman
Matt Fichman
A: 

fut.get() is a blocking call, even after the timeout, you will block until the task is done. If you want to stop as close to the 5 minute mark as possible, you do need to check the interrupt flag, I just recommend you do so using the Thread.isInterrupted() method which preserves the interrupt state. If you want to just stop immediately and don't need to clean any state, then throw an exception which will be caught by the Future and indicated to you as an ExecutionException.

fut.cancel(true) does not do anything as the invokeAll() method has already done this for you.

Unless you use the "tasks" Collection somewhere else, you probably don't need to call clear() on it. This isn't going to be the source of your problem since the invokeAll() method is done with the List by the time you call clear(). But, if you need to start forming a list of new tasks to execute, I suggest you form a new List of tasks, not use an old List of new Tasks.

Unfortunately, I do not have an answer for your problem. I do not see enough information here to diagnose it. Nothing in the code snippet you provided indicates an improper (only unnecessary) use of library classes/methods. Perhaps if you included a full stack trace, instead of the one line error.

Tim Bender
I used the collection somewhere else, and it's in a while loop, so it needs to be cleared in order to be empty when the loop repeats. Of course I can do the clear() after the code shown in the post, and this should be ok.The important part of my question isn't the exception: what I need to know is how to stop the future after 5 minutes and, of course, I'll try throwing an exception as you suggested. I can even change the way I submit my tasks. I learnt that way here: http://stackoverflow.com/questions/1322147/help-with-java-executors-wait-for-task-termination
Raffo