I need to run N slow calculations (where N is a fairly large number) and would like to do so on M threads since the slow calculations have a good deal of IO wait time. I have put together a small example that works well for the case that all calculations succeed. However, if a calculation fails the desired behavior is to stop processing further calculations. Each successful calculation has already written it's result to a database, so I just need to determine which calculation failed and stop calculations that have not yet been started.
My approach is to use the ExecutorService interface to an Executors.newFixedThreadPool. However, I don't see a clean way to identify that one of the calculations failed (in my example returned false) and stop calculations that have been submitted to the ExecutorService but not yet assigned a thread from the pool.
Is there a clean method to do that? Is there a better approach for me to consider?
import java.util.*;
import java.util.concurrent.*;
class Future
{
static private class MyWorker implements Callable
{
private Integer item;
public MyWorker(Integer item)
{
this.item = item;
}
public Boolean call() throws InterruptedException
{
if (item == 42)
{
return false;
}
else
{
System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
Thread.sleep(1000);
return true;
}
}
}
static int NTHREADS = 2;
public static void main(String args[])
{
Queue<Integer> numbers = new LinkedList<Integer>();
for (int i=1; i<10000; i++)
{
numbers.add(i);
}
System.out.println("Starting thread test.");
ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);
for (Integer i : numbers)
{
MyWorker my = new MyWorker(i);
System.out.println("Submit..." + i.toString());
exec.submit(my);
System.out.println("... Done Submit");
}
exec.shutdown();
System.out.println("Exiting thread test.");
}
}
EDIT: Here's a working implementation of afk's suggestion. Still plan to look at the callback solution and hope for other suggestions.
import java.util.*;
import java.util.concurrent.*;
class MyFuture
{
static private class MyWorker implements Callable
{
private Integer item;
public MyWorker(Integer item)
{
this.item = item;
}
public Boolean call()
{
if (item == 42)
{
return false;
}
else
{
System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
try
{
Thread.sleep(1000);
}
catch (InterruptedException ie)
{
// Not much to do here except be grumpy they woke us up...
}
return true;
}
}
}
static int NTHREADS = 4;
public static void main(String args[]) throws InterruptedException
{
Queue<Integer> numbers = new LinkedList<Integer>();
for (int i=1; i<100; i++)
{
numbers.add(i);
}
System.out.println("Starting thread test.");
ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);
List<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
for (Integer i : numbers)
{
MyWorker my = new MyWorker(i);
System.out.println("Submit..." + i.toString());
Future<Boolean> f = exec.submit(my);
futures.add(f);
System.out.println("... Done Submit");
}
boolean done = false;
while (!done)
{
Iterator<Future<Boolean>> it = futures.iterator();
while (it.hasNext())
{
Future<Boolean> f = it.next();
if (f.isDone())
{
try
{
System.out.println("CHECK RETURN VALUE");
if (f.get())
{
it.remove();
}
else
{
System.out.println("IMMEDIATE SHUTDOWN");
exec.shutdownNow();
done = true;
break;
}
}
catch (InterruptedException ie)
{
}
catch (ExecutionException ee)
{
}
}
}
Thread.sleep(1000);
if (futures.size() == 0)
{
done = true;
}
}
exec.shutdown();
System.out.println("Exiting thread test.");
}
}