views:

106

answers:

5
+1  A: 

Check Semaphore

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it

So, whenever you thread finishes, it frees one permit, which is then acquired by the main thread

Bozho
Why is this better than pooling? It seems that it just introduces an additional level of complexity (explicit management of resources) which he should not really need to deal with.
danben
he said he'd like to avoid them. I didn't say this is a better option though, it's simply an option :)
Bozho
+1  A: 

You should use a thread pool. In a thread pool, you have a fixed number of threads and tasks are kept in a queue; whenever a thread is available, a task is taken off the queue and executed by that thread.

Here is a link to the Sun tutorial on thread pooling.

Edit: just noticed that you wrote in your answer that you think you cannot use thread pooling. I don't see why this is the case. You can set threads to be created on-demand rather than all at once if you are worried about creation overhead, and once created an idle thread is not really going to hurt anything.

You also say that it's the main thread's decision to create a new Thread or not, but does it really need to be? I think that may just overcomplicate things for you.

danben
-1 it does make sense to use a thread pool rather than creating threads, but then you have exactly the same problem of launching task C when either task A or B completes, which is what the question was about.
Pete Kirkham
@Pete Kirkham: Maybe I'm missing something, but I don't see what the problem is. If C hasn't started because we want to limit the number of executing threads, then the ExecutorService takes care of this automatically. If C hasn't started because it depends on the completion of A, then A can add C to the queue before completing. Nothing in the question seems to indicate that C depends on (A or B); it seems like a C happens whenever an A or a B is done.
danben
I read the OP code that the intent was to launch *the* C task when *either* A or B finishes. It's not entirely clear; I've asked for clarification. It's also not really the job of a thread pool to control communication between the tasks.
Pete Kirkham
Sorry, i missed to mention that C depends on A or B. For example A failed to load the picture, then thread C should load the missing bytes. But thread A should not take care of thread-creation of C. :)Also the number of threads can change. For example the user can increment the number of used threads and then the main-thread should create another thread D.Maybe i need to read the articles about Thread Pools again :)
Biggie
@Pete Kirkham: from this last comment and the one above it sounds like there can be multiple Cs. Regardless, notice that I didn't suggest that the thread pool should be controlling communication between tasks - this would be the job of the workflow, and that certainly does not preclude the use of a pool.
danben
A: 

Is there a reason to control the thread execution directly instead of using something like ExecutorService?

@danben got there first, but I fell into the same pooling trap.

Quotidian
A: 

A lot of the complexity in your code is that the main thread is trying to wait on two different objects. There's nothing which says you can't use wait and notify on another object, and if your tasks are ( A or B ) then C, the code below will work - wait on a reference which is set to indicate the first task to complete.

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class BiggieThreads
{
    private static class MyTask implements Runnable
    {
        final int steps;
        final AtomicReference<MyTask> shared;
        final String name;

        MyTask ( int steps, AtomicReference<MyTask> shared, String name )
        {
            this.shared = shared;
            this.steps = steps;
            this.name = name;
        }

        @Override
        public void run()
        {
            for ( int i = 1; i <= steps; i++ ) {
                System.out.println ( "Running: " + this  + " " + i + "/" + steps);
                try {
                    Thread.sleep ( 100 );
                } catch ( InterruptedException exc ) {  }
            }

            // notify if this is the first to complete
            if ( shared.compareAndSet ( null, this ) )
                synchronized ( shared ) {
                    shared.notify();
                }

            System.out.println ( "Completed: " + this );
        }

        @Override
        public String toString ()
        {
            return name;
        }
    }

    public void startTest() throws InterruptedException
    {
        final ExecutorService pool = Executors.newFixedThreadPool ( 3 );
        final AtomicReference<MyTask> shared = new AtomicReference<MyTask>();

        Random random = new Random();

        synchronized ( shared ) {
            // tasks launched while lock on shared held to prevent
            // them notifying before this thread waits
            pool.execute ( new MyTask ( random.nextInt ( 5 ) + 3, shared, "a" ) );
            pool.execute ( new MyTask ( random.nextInt ( 5 ) + 3, shared, "b" ) );

            shared.wait();
        }

        System.out.println ( "Reported: " + shared.get() );

        pool.shutdown();
    }

    public static void main ( String[] args ) throws InterruptedException
    {
        BiggieThreads test = new BiggieThreads ();
        test.startTest();
    }
}

I'd tend to use a semaphore for this job in production, as although the wait is quite simple, using in semaphore puts a name to the behaviour, so there's less to work out when you next read the code.

Pete Kirkham
Then the new thread C should be created in the synchronized block after the wait-statement?But i think it is not possible to increment the number of parallel threads of the thread pool at runtime, right?
Biggie
@Biggie It doesn't have to be within the block unless it also cares about notifying the shared object. You shouldn't really care about limiting the number of threads; use a `newCachedThreadPool` and limit the number of active download tasks instead.
Pete Kirkham
In your code outside the block, shared.get() will be printed. But what happens if both thread A and B will notify one after another just before the get() will be called? E.G. the download in A fails and A and B will notify. Then i could miss that A failes and could not load the last bytes. In this case i need to put that code into the block, right? newCachedThreadPool sounds good. THX
Biggie
+2  A: 

This sounds like a classic case for using a ThreadPoolExecutor for performing the tasks concurrently, and wrapping it with an ExecutorCompletionService, for collecting the results as they arrive.

For example, assuming that tasks contains a set of tasks to execute in parallel, each returning a String value when it terminates, the code to process the results as they become available can be something like:

List<Callable<String>> tasks = ....;
Executor ex = Executors.newFixedThreadPool(10);
ExecutorCompletionService<String> ecs = new ExecutorCompletionService<String>(ex);
for (Callable<String> task : tasks) 
    ecs.submit(task);
for(int i = 0; i < tasks.size(); i++) {
    String result = ecs.take().get();
    //Do something with result
}

If you include the identity of the task as a part of the returned value, then you can make decisions depending on the completion order.

Eyal Schneider