views:

394

answers:

5

I have some database table and need to process records from it 5 at a time as long as app is running. So, it looks like this:

  1. Get a record that hasn't been processed yet or not processing now by other threads.
  2. Process it (this is a long process that depends on internet connection so it could timeout/throw errors).
  3. Move to the next record. When reached end of table start from beginning.

I don't have much experience with threads, so I see two possible strategies:

Approach A.

1.Create new ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

2.Add 5 tasks to it:

for (int i = 0; i < 5; i++) {
    taskExecutor.execute(new MyTask());
}

3.Each task will be infinite loop, that: reads a record from the table, processes it, and then gets another record.

The problems with this approach is how to inform other threads about which records are currenly processing. To do this I can either use "status" field in the table or just use some CopyOnWriteArraySet that holds currently processing ID's.

Approach B.

1.Create the same ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

2. Have an infinite loop that selects records that need to be processed and passes them to the executor:

while (true) {
    //get next record here
    taskExecutor.execute(new MyTask(record));
    //monitor the queue and wait until some thread is done processing,
    //so I can add another record
}

3.Each task processes a single record.

The problem with this approach is that I need to add tasks to the executor's queue slower than they are processed to not let them pile up over time. It means I need to monitor not only which tasks are currently running but also when they are done processing, so I can add new records to the queue.

Personally I think first approach is better (easier), but I feel that the second one is more correct. What do you think? Or maybe I should do something completely different?

Also I can use Spring or Quartz libraries for this if needed.

Thanks.

+1  A: 

I would go with this approach:

Use one thread to distribute work. This thread will spawn 5 other threads and go to sleep. When a work thread finishes, it will wake up the work distributor thread, which will then spawn new work thread and go to sleep...

abababa22
+4  A: 

An alternative is to use an ArrayBlockingQueue of size 5. A single producer thread will go over the table, filling it up initially and putting records in as the consumers handle them. Five consumer threads will each take() a record, process it and go back for another record. This way the producer thread ensures that no record is given to two threads at once, and the consumer threads work on independent records. Java Concurrency in Practice will probably give you many more options, and is a great read for this type of problems.

Yuval F
Executors give you exactly the same pattern for submitting work to threads. You can configure queue, rejection policy, # of threads, # of max threads for peaks, etc.
Peter Štibraný
+1 for ArrayBlockingQueue
Seun Osewa
+4  A: 

I think that CompletionService (and ExecutorCompletionService) can help you.

You submit all your tasks via completion service, and it allows you to wait until one of thread (any thread) finishes its task. This way you can submit next task as soon as there is free thread. This would imply that you use approach B.

Pseudo code:

Create ThreadPoolExecutor and ExecutorCompletionService wrapping it

while (true) {
  int freeThreads = executor.getMaximumPoolSize() - executor.getActiveCount()
  fetch 'freeThreads' tasks and submit to completion service (which
                                      in turn sends it to executor)

  wait until completion service reports finished task (with timeout)
}

Timeout in wait helps you to avoid situation when there was no task in the queue, so all threads are idle, and you are waiting until one of them finishes -- which would never happen.

You can check for number of free threads via ThreadPoolExecutor methods: getActiveCount (active threads) and getMaximumPoolSize (max available configured threads). You will need to create ThreadPoolExecutor directly, or to cast object returned from Executors.newFixedThreadPool(), although I would prefer direct creation... see source of Executors.newFixedThreadPool() method for details.

Peter Štibraný
Thanks, but this looks pretty complicated and easy to screw something up if not done properly. Is there something wrong with my first approach? To me it would be much more simple. I don't need to monitor anything except current processing records.
serg
I don't think it's complicated. Your first approach uses 5 threads doing DB work, which can lead to 5 concurrent connections. Furthermore, like you found out, you need some kind of 'locking' for records so that threads don't start work on same task. This problem cannot occur here... there is only
Peter Štibraný
... one thread fetching tasks from DB (main while loop).
Peter Štibraný
+1  A: 

I would have a static collection in MyTask

public class MyTask implements Runnable {
  private static ArrayList<RecordID> processed = new ArrayList<RecordID>();
  private static ArrayList<RecordID> processing = new ArrayList<RecordID>();

  private RecordID working = null;

  public void run() {
    for(;;) {
      synchronized( MyTask.class ) {
        Record r = getUnprocessedRecord(); // use processed and processing to do query
        if ( r == null ) {  // no more in table to process
          if ( processing.length == 0 ) { // nothing is processing
            processed.clear();  // this should allow us to get some results on the next loop
          }
          Thread.sleep( SLEEP_INTERVAL );
          continue;
        } else {
          working = r.getRecordID();
          processing.add( working );
        }
      }
      try {
        //do work
        synchronized( MyTask.class ) {
          processed.add(working);
        }
      } catch( Whatever w ){
      } finally {
        synchronized( MyTask.class ) {
          processing.remove(working);
        }
      } 
    }
  }

}

Clint
A: 

My person opinion , Go for QUARTZ with Spring. Its the perfect choice. Have been using it on production for 2+ years now. Why try and re-invent the wheel when some people are already doing it a the best. Not to mention the different modes of running it provide. I would suggest to atleast try it out.

Franklin