views:

180

answers:

4

I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService and then needs to wait until all N items have been processed.

Caveats

  • N is not known in advance. If it were I would simply create a CountDownLatch and then have producer thread await() until all work was complete.
  • Using a CompletionService is inappropriate because although my producer thread needs to block (i.e. by calling take()) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.

My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0 whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.

Is there a better approach to this problem or is there a suitable construct in java.util.concurrent I should be using rather than "rolling my own"?

Thanks in advance.

+1  A: 

You could of course use a CountDownLatch protected by an AtomicReference so that your tasks get wrapped thus:

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

Notice that the tasks run their work and then spin until the count-down is set (i.e. the total number of tasks is know). As soon as the count-down is set, they count it down and exit. These get submitted as follows:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

After the point of creation/submission of your work, when you know how many tasks you have created:

latch.set(new CountDownLatch(nTasks));
latch.get().await();
oxbow_lakes
Why does wrapping the CountDownLatch with AtomicReference help you in this case? The reference doesn't need to be protected, since it is passed to the MyTask in its constructor, and never changes after that.
Avi
Also, be sure to handle arbitrary throwables (RuntimeExceptions and Errors) in the run() method, at least by putting coundDown() in a finally{} block.
Avi
And of course, the question specifically was about the case when you don't know the number of tasks at the time of task creation.
Avi
@Avi - you must know `nTasks` at some point (as you also assume in your response). The atomic reference helps because the `CDL` has to safely set across multiple threads. My method *does work*, as does yours
oxbow_lakes
@oxbow_lakes: You are right - I didn't fully understand your code when I commented.I think the spinning could be avoided, by using wait() and notifyAll().
Avi
A: 

I assume your producer does not need to know when the queue is empty but needs to know when the last task has been completed.

I would add a waitforWorkDone(producer) method to the consumer. The producer can add its N tasks and call the wait method. The wait method blocks the incoming thread iff the work queue is not empty and no tasks are executing at the moment.

The consumer threads notifyAll() on the waitfor lock iff its task has been finished, the queue is empty and no other task is being executed.

rsp
+1  A: 

I've used an ExecutorCompletionService for something like this:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

This involves keeping a queue of tasks that have been submitted, so if your list is arbitrarily long, your method might be better.

But make sure to use an AtomicInteger for the coordination, so that you will be able to increment it in one thread, and decrement it in the worker threads.

Avi
How can you `awaitTermination`? What is shutting down your service? Also, you cannot re-use the completion service in your answer anyway, because you may be waiting on tasks that were logically part of *another set*. And (as mentioned in comments to my answer), you still need to know `nTasks` at some point
oxbow_lakes
@oxbow_lakes: True. That is why I only have it as an option (which I've edited out, since it doesn't help in this case).I'm assuming that this completion service will not be used for another set of tasks at the same time (overlapping) with this one. It can be used again later, from the same thread.
Avi
The advantage of this way of keeping track of the number of tasks rather than using an AtomicBoolean, is that you don't have to handle the wait() and notify() manually - the queue will take care of that. All you have to track is the number of items in the queue.
Avi
So in essence you introduced a second queue with task-ends that gets consumed by the producer. The consumer has all information to send the correct signal once the producer makes it known it sent all tasks. Imho the total count of tasks is an artifact that does not have to be modeled into the solution to this problem.
rsp
+2  A: 

java.util.concurrent.Phaser looks like it would work well for you. It is planned to be release in Java 7 but the most stable version can be found at jsr166's interest group website.

The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.

A quick example on how it would work:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
 return new Runnable(){
  public void run(){
   phaser.register();
   ..do stuff...
   phaser.arrive();
  }
 };
}
public void doWork(){
 phaser.register();//register self
 for(int i=0 ; i < N; i++){
  executor.submit( getRunnable());
 }
 phaser.arriveAndAwaitAdvance();
}
John V.
Cool - Thanks; This looks like exactly what I'm after ... Can't believe they called it Phaser though.
Adamski