views:

169

answers:

3

I need to run N slow calculations (where N is a fairly large number) and would like to do so on M threads since the slow calculations have a good deal of IO wait time. I have put together a small example that works well for the case that all calculations succeed. However, if a calculation fails the desired behavior is to stop processing further calculations. Each successful calculation has already written it's result to a database, so I just need to determine which calculation failed and stop calculations that have not yet been started.

My approach is to use the ExecutorService interface to an Executors.newFixedThreadPool. However, I don't see a clean way to identify that one of the calculations failed (in my example returned false) and stop calculations that have been submitted to the ExecutorService but not yet assigned a thread from the pool.

Is there a clean method to do that? Is there a better approach for me to consider?

import java.util.*;
import java.util.concurrent.*;

class Future
{
    static private class MyWorker implements Callable
    { 
     private Integer item;
     public MyWorker(Integer item)
     {
      this.item = item;
     }

     public Boolean call() throws InterruptedException
     {
      if (item == 42) 
      {
       return false;
      }
      else
      {
       System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
       Thread.sleep(1000);
       return true;
      }
     } 
    }

    static int NTHREADS = 2;

    public static void main(String args[]) 
    {
     Queue<Integer> numbers = new LinkedList<Integer>();  
     for (int i=1; i<10000; i++)
     {
      numbers.add(i);
     }

     System.out.println("Starting thread test.");

     ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);

     for (Integer i : numbers)
     {
      MyWorker my = new MyWorker(i);
      System.out.println("Submit..." + i.toString());
      exec.submit(my);
      System.out.println("... Done Submit");
     }

     exec.shutdown();

     System.out.println("Exiting thread test.");

    }
}

EDIT: Here's a working implementation of afk's suggestion. Still plan to look at the callback solution and hope for other suggestions.

import java.util.*;
import java.util.concurrent.*;

class MyFuture
{
    static private class MyWorker implements Callable
    { 
     private Integer item;
     public MyWorker(Integer item)
     {
      this.item = item;
     }

     public Boolean call() 
     {
      if (item == 42) 
      {
       return false;
      }
      else
      {
       System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
       try
       {
        Thread.sleep(1000);
       }
       catch (InterruptedException ie) 
       { 
       // Not much to do here except be grumpy they woke us up...
       } 
       return true;
      }
     } 
    }

    static int NTHREADS = 4;

    public static void main(String args[]) throws InterruptedException
    {
     Queue<Integer> numbers = new LinkedList<Integer>();  
     for (int i=1; i<100; i++)
     {
      numbers.add(i);
     }

     System.out.println("Starting thread test.");

     ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);

     List<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();

     for (Integer i : numbers)
     {
      MyWorker my = new MyWorker(i);
      System.out.println("Submit..." + i.toString());
      Future<Boolean> f = exec.submit(my);
      futures.add(f);
      System.out.println("... Done Submit");
     }

     boolean done = false;

     while (!done)
     {
      Iterator<Future<Boolean>> it = futures.iterator();

      while (it.hasNext()) 
      {
       Future<Boolean> f = it.next();
       if (f.isDone())
       {
        try
        {
         System.out.println("CHECK RETURN VALUE");
         if (f.get()) 
         {
          it.remove();
         }
         else
         {     
          System.out.println("IMMEDIATE SHUTDOWN");
          exec.shutdownNow();
          done = true;
          break;
         }
        }
        catch (InterruptedException ie)
        {
        }
        catch (ExecutionException ee)
        {
        }
       }
      }
      Thread.sleep(1000);
      if (futures.size() == 0)
      {
       done = true;
      }
     }

     exec.shutdown();

     System.out.println("Exiting thread test.");

    }
}
A: 
class Stopper
{
    boolean stopped = false;
    ExecutorService exec;

    public void stop() { if (!stopped) { stopped = true; exec.shutdown(); } }
}

static private class MyWorker implements Callable
{   
    private Integer item;
    private Stopper stopper;
    public MyWorker(Integer item, Stopper stopper)
    {
            this.stopper = stopper;
            this.item = item;
    }

    public Boolean call() throws InterruptedException
    {
            if (item == 42) 
            {
                    stopper.stop();
                    return false;
            }
            else
            {
                    System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
                    Thread.sleep(1000);
                    return true;
            }
    }       
}
tster
That's close to one of my thoughts as well. What bugs me about it is that the thread needs knowledge of it's execution container. Still, that would certainly resolve the issue.
Eric J.
The stopped flag should be `volatile`, since it's accessed by multiple threads. But it's a moot point since any tasks already submitted will still be executed, even after the shutdown call.
erickson
+1  A: 

You could incorporate the Future facet of the Callable framework:

 Set futures = new HashSet<Future<Boolean>>
 for (Integer i : numbers)
 {
    MyWorker my = new MyWorker(i);
    System.out.println("Submit..." + i.toString());
    Future<Boolean> f = exec.submit(my);
    futures.add(f);
    System.out.println("... Done Submit");
  }

  for (Future f : futures) {
    if (!f.get().booleanValue()) {
      exec.shutdown();
  }
akf
What if the last task in the set was the first to run and fails quickly? This won't be discovered until the rest of the (unnecessary) tasks have been completed, because calls to `get` their results will block.
erickson
good point. To fix this, the futures could be put in a pub/sub queue and tested in a separate thread.
akf
I like this basic approach, though changed several implementation details when testing it. Used a LinkedList since Set returns elements in undefined order, often returning Future instances that have not been assigned a thread yet. Also check isDone() before calling get() to avoid a wait (which makes the Set point a bit mute I guess). Finally called shutdownNow() rather than shutdown() because otherwise all submitted calculations will still be done. Posted the code as an edit to my question.
Eric J.
+2  A: 

Using a callback, as I outline in another answer, you can be notified of a failure, and cancel all submitted jobs. (In my example, the Callback implementation class could have a reference to a Collection to which each Future was added.) For those tasks that have completed (or started, depending on the value of the argument) cancel does nothing. The rest will never be started.

erickson
A callback seems elegant. I'll try it out, but that will have to wait for morning...
Eric J.
I have an implementation of this working, however the main thread exits as soon as the last job is submitted. Is there an elegant way for the thread to block until all submitted jobs are complete, or the callback indicates failure?
Eric J.
Discovered ExecutorService.awaitTermination(), issue solved.
Eric J.