views:

386

answers:

4

I have a queue of task in java. This queue is in a table in the DB.

I need to:

  • 1 thread per task only
  • No more than N threads running at the same time. This is because the threads have DB interaction and I don't want have a bunch of DB connections opened.

I think I could do something like:

final Semaphore semaphore = new Semaphore(N);
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
     final CountDownLatch cdl = new CountDownLatch(tasks.size());
     for (final JobTask task : tasks) {
      Thread tr = new Thread(new Runnable() {

       @Override
       public void run() {
        semaphore.acquire();
        task.doWork();
        semaphore.release();
        cdl.countDown();
       }

      });
     }
     cdl.await();
    }
}

I know that an ExecutorService class exists, but I'm not sure if it I can use it for this.

So, do you think that this is the best way to do this? Or could you clarify me how the ExecutorService works in order to solve this?

final solution:

I think the best solution is something like:

while (isOnJob) {
    ExecutorService executor = Executors.newFixedThreadPool(N);
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
     for (final JobTask task : tasks) {
      executor.submit(new Runnable() {

       @Override
       public void run() {
        task.doWork();
       }

      });
     }
    }
    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
}

Thanks a lot for the awnsers. BTW I am using a connection pool, but the queries to the DB are very heavy and I don't want to have uncontrolled number of task at the same time.

+6  A: 

You can indeed use an ExecutorService. For instance, create a new fixed thread pool using the newFixedThreadPool method. This way, besides caching threads, you also guarantee that no more than n threads are running at the same time.

Something along these lines:

private static final ExecutorService executor = Executors.newFixedThreadPool(N);
// ...
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        List<Future<?>> futures = new ArrayList<Future<?>>();
        for (final JobTask task : tasks) {
                Future<?> future = executor.submit(new Runnable() {    
                        @Override
                        public void run() {
                                task.doWork();
                        }
                });
                futures.add(future);
        }
        // you no longer need to use await
        for (Future<?> fut : futures) {
          fut.get();
        }
    }
}

Note that you no longer need to use the latch, as get will wait for the computation to complete, if necessary.

JG
So It seems I don't need the semaphore neither, do I?
damian
Yes, you can also remove it.
JG
A: 

Achieving good performance also depends on the kind of work that needs to be done in the threads. If your DB is the bottleneck in processing I would start paying attention to how your threads access the DB. Using a connection pool is probably in order. This might help you to achive more throughput, since worker threads can re-use DB connections from the pool.

Jeroen van Bergen
+4  A: 

I agree with JG that ExecutorService is the way to go... but I think you're both making it more complicated than it needs to be.

Rather than creating a large number of threads (1 per task) why not just create a fixed sized thread pool (with Executors.newFixedThreadPool(N)) and submit all the tasks to it? No need for a semaphore or anything like that - just submit the jobs to the thread pool as you get them, and the thread pool will handle them with up to N threads at a time.

If you aren't going to use more than N threads at a time, why would you want to create them?

Jon Skeet
+1  A: 

Use a ThreadPoolExecutor instance with an unbound queue and fixed maximum size of Threads, e.g. Executors.newFixedThreadPool(N). This will accept a large number of tasks but will only execute N of them concurrently.

If you choose a bounded queue instead (with a capacity of N) the Executor will reject the execution of the task (how exactly depends on the Policy you can configure when working with ThreadPoolExecutor directly, instead of using the Executors factory - see RejectedExecutionHandler).

If you need "real" congestion control you should setup a bound BlockingQueue with a capacity of N. Fetch the tasks you want done from the database and put them into the queue - if it's full the calling thread will block. In another thread (perhaps also started using the Executor API) you take tasks from the BlockingQueue and submit them to the Executor. If the BlockingQueue is empty the calling thread will also block. To signal that you're done use a "special" object (e.g. a singleton which marks the last/final item in the queue).

yawn