views:

115

answers:

3

In a framework I'm developing the user can choose to run a certain time consuming task in background while doing something else. That task computes a series of results. In some point, when he/she needs the results from the background task, it's acceptable to wait some more time until:

  • a) a time-out occurs (in this case the user would like to get all the results computed so far, if they exist); or

  • b) a maximum number or computed results is reached (normal ending),

whichever happens first.

The gotcha is: Even if a time-out occurs, the user still wants the results computed so far.

I've tried to do this using Future<V>.get(long timeout, TimeUnit unit) and a Callable<V> -derived class, but it happens that when a TimeoutException occurs it usually means the task was prematurely finished, so no results are available. Thus I had to add a getPartialResults() method (see DiscoveryTask below) and I'm afraid this usage is too counter-intuitive for the potential users.

Discovery invocation:

public Set<ResourceId> discover(Integer max, long timeout, TimeUnit unit)
    throws DiscoveryException
{
    DiscoveryTask task = new DiscoveryTask(max);
    Future<Set<ResourceId>> future = taskExec.submit(task);
    doSomethingElse();
    try {
        return future.get(timeout, unit);
    } catch (CancellationException e) {
        LOG.debug("Discovery cancelled.", e);
    } catch (ExecutionException e) {
        throw new DiscoveryException("Discovery failed to execute.", e);
    } catch (InterruptedException e) {
        LOG.debug("Discovery interrupted.", e);
    } catch (TimeoutException e) {
        LOG.debug("Discovery time-out.");
    } catch (Exception e) {
        throw new DiscoveryException("Discovery failed unexpectedly.", e);
    } finally {
        // Harmless if task already completed
        future.cancel(true); // interrupt if running
    }
    return task.getPartialResults(); // Give me what you have so far!
}

Discovery realization:

public class DiscoveryTask extends Callable<Set<ResourceId>> 
        implements DiscoveryListener
{
    private final DiscoveryService discoveryService;
    private final Set<ResourceId> results;
    private final CountDownLatch doneSignal;
    private final MaximumLimit counter;
    //...
    public DiscoveryTask(Integer maximum) {
        this.discoveryService = ...;
        this.results = Collections.synchronizedSet(new HashSet<ResourceId>());
        this.doneSignal = new CountDownLatch(1);
        this.counter = new MaximumLimit(maximum);
        //...
    }

    /**
     * Gets the partial results even if the task was canceled or timed-out.
     * 
     * @return The results discovered until now.
     */
    public Set<ResourceId> getPartialResults() {
        Set<ResourceId> partialResults = new HashSet<ResourceId>();
        synchronized (results) {
            partialResults.addAll(results);
        }
        return Collections.unmodifiableSet(partialResults);
    }

    public Set<ResourceId> call() throws Exception {
        try {
            discoveryService.addDiscoveryListener(this);
            discoveryService.getRemoteResources();
            // Wait...
            doneSignal.await();
        } catch (InterruptedException consumed) {
            LOG.debug("Discovery was interrupted.");
        } catch (Exception e) {
            throw new Exception(e);
        } finally {
            discoveryService.removeDiscoveryListener(this);
        }
        LOG.debug("Discovered {} resource(s).", results.size());
        return Collections.unmodifiableSet(results);
    }

    // DiscoveryListener interface
    @Override
    public void discoveryEvent(DiscoveryEvent de) {
        if (counter.wasLimitReached()) {
            LOG.debug("Ignored discovery event {}. "
                + "Maximum limit of wanted resources was reached.", de);
            return;
        }
        if (doneSignal.getCount() == 0) {
            LOG.debug("Ignored discovery event {}. "
                + "Discovery of resources was interrupted.", de);
            return;
        }
        addToResults(de.getResourceId());
    }

    private void addToResults(ResourceId id) {
        if (counter.incrementUntilLimitReached()) {
            results.add(id);
        } else {
            LOG.debug("Ignored resource {}. Maximum limit reached.",id);
            doneSignal.countDown();
        }
    }
}

In the chapter 6 of the book Java Concurrency in Practice from Brian Goetz et al, the authors show a solution for a related problem, but in that case all the results can be computed in parallel, which is not my case. To be precise, my results depend on external sources, so I have no control on when they come. My user defines the desired maximum number of results he wants before invoking the task execution, and a maximum limit of time she agreed to wait for after she is ready to get the results.

It is this OK for you? Would you do it differently? Is there any better approach?

+2  A: 

Pass a (shorter) timeout to the task itself, and have it return prematurely when it reaches this 'soft timeout'. The result type can then have a flag telling whether the result is perfect or not:

Future<Result> future = exec.submit(new Task(timeout*.9));
//if you get no result here then the task misbehaved, 
//i.e didn't obey the soft timeout. 
//This should be treated as a bug
Result result = future.get(timeout);
if (result.completedInTime()) {
    doSomethingWith(result.getData());        
} else {
    doSomethingElseWith(result.getData());       
}
disown
A: 

If individual résulte come "cheap" then make your routine generate results and add Them to an output queue and see how much time is left and compared it to the time the current results took. Only calculate the next result if you Can dó it in time.

This will allow you to stay single threaded which generally tends to make your code simpler and hence less error prone.

Thorbjørn Ravn Andersen
Well, I can't say when the next result will be available. It's a distributed system, and the results are based on the responses from other machines in the network. They can answer me or not, the network can fail, etc. The computation based on their answer is, however, cheap.
A: 

Something like this:

public static class Consumer<T> {

    private BlockingQueue<T> queue;
    private T lastElement;

    public Consumer(BlockingQueue<T> queue, T lastElement) {
        this.queue = queue;
        this.lastElement = lastElement;
    }

    public Collection<T> acquireResults(long timeout, TimeUnit timeoutTimeUnit) throws InterruptedException {

        LinkedList<T> result = new LinkedList<T>();
        queue.drainTo(result);

        if (result.getLast() == lastElement)
            return result;
        else
            result.add(queue.poll(timeout, timeoutTimeUnit));

        result.removeLast();

        return result;

    }

}

public static void main(String[] args) throws InterruptedException {

    String lastElement = "_lastElement";

    BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
    Consumer<String> consumer = new Consumer<String>(queue, lastElement);

    for (int i=0; i<100; i++)
        queue.put(UUID.randomUUID().toString());

    System.out.println(consumer.acquireResults(5, TimeUnit.SECONDS));

    queue.put("foo");
    queue.put("bar");

    queue.put(lastElement);

    System.out.println(consumer.acquireResults(5, TimeUnit.SECONDS));

}

You will probably want to use a result wrapper to indicate that some result is indeed the last result instead of using a magic value.

yawn