views:

304

answers:

3

Hi,

I am using the ThreadPoolExecutor to implement threading in my Java Application.

I have a XML which I need to parse and add each node of it to a thread to execute the completion. My implementation is like this:

parse_tp is a threadpool object created & ParseQuotesXML is the class with the run method.

        try {     
           List children = root.getChildren();               
        Iterator iter = children.iterator();

        //Parsing the XML     
        while(iter.hasNext()) {       
           Element child = (Element) iter.next();           
           ParseQuotesXML quote = new ParseQuotesXML(child, this);         
           parse_tp.execute(quote);         
        }
    System.out.println("Print it after all the threads have completed");
        catch(Exception ex) {  
        ex.printStackTrace();      
        }
        finally {  
    System.out.println("Print it in the end.");
if(!parse_tp.isShutdown()) {
                if(parse_tp.getActiveCount() == 0 && parse_tp.getQueue().size() == 0 ) {
                    parse_tp.shutdown();                    
                } else {
                    try {
                        parse_tp.awaitTermination(30, TimeUnit.SECONDS);
                    } catch (InterruptedException ex) {
                        log.info("Exception while terminating the threadpool "+ex.getMessage());
                        ex.printStackTrace();
                    }
                }
            }
          parse_tp.shutdown();  
        }

The problem is, the two print out statements are printed before the other threads exit. I want to make the main thread wait for all other threads to complete. In normal Thread implementation I can do it using join() function but not getting a way to achieve the same in ThreadPool Executor. Also would like to ask if the code written in finally block to close the threadpool proper ?

Thanks, Amit

A: 

A CountDownLatch is designed for this very purpose. Here's a good example. When the number of threads is not know in advance, consider an UpDownLatch.

trashgod
Thanks trashgod, but I dont have a exact count of the nodes of xml which I need to parse, so wont be able to use CountDownLatch. But I wasn't aware of any such property available in Java, so thanks a lot for this.
Amit
Excellent. As suggested above, `Future` is more flexible, but I added a link to an UpDownLatch example, too.
trashgod
A: 

First of all you can use ThreadPoolExecutor.submit() method, which returns Future instance, then after you submitted all your work items you can iterate trough those futures and call Future.get() on each of them.

Alternatively, you can prepare your runnable work items and submit them all at once using ThreadPoolExecutor.invokeAll(), which will wait until all work items completed and then you can get the execution results or exception calling the same Future.get() method.

Eugene Kuleshov
+1  A: 

To answer your second question, I think you are doing a reasonable job trying to clean up your thread pool.

With respect to your first question, I think the method that you want to use is submit rather than execute. Rather than try to explain it all in text, here's an edited fragment from a unit test that I wrote that makes many tasks, has each of them do a fragment of the total work and then meets back at the starting point to add the results:

final AtomicInteger messagesReceived = new AtomicInteger(0);

// ThreadedListenerAdapter is the class that I'm testing 
// It's not germane to the question other than as a target for a thread pool.
final ThreadedListenerAdapter<Integer> adapter = 
    new ThreadedListenerAdapter<Integer>(listener);
int taskCount = 10;

List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

for (int whichTask = 0; whichTask < taskCount; whichTask++) {
    FutureTask<Integer> futureTask = 
        new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            // Does useful work that affects messagesSent
            return messagesSent;
        }
    });
    taskList.add(futureTask);
}

for (FutureTask<Integer> task : taskList) {
    LocalExecutorService.getExecutorService().submit(task);
}

for (FutureTask<Integer> task : taskList) {
    int result = 0;
    try {
        result = task.get();
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    } catch (ExecutionException ex) {
        throw new RuntimeException("ExecutionException in task " + task, ex);
    }
    assertEquals(maxMessages, result);
}

int messagesSent = taskCount * maxMessages;
assertEquals(messagesSent, messagesReceived.intValue());

I think this fragment is similar to what you're trying to do. The key components were the submit and get methods.

Bob Cross