views:

308

answers:

8

I have a Java Thread like the following:

   public class MyThread extends Thread {
        MyService service;
        String id;
        public MyThread(String id) {
            this.id = node;
        }
        public void run() {
            User user = service.getUser(id)
        }
    }

I have about 300 ids, and every couple of seconds - I fire up threads to make a call for each of the id. Eg.

for(String id: ids) {
    MyThread thread = new MyThread(id);
    thread.start();
}

Now, I would like to collect the results from each threads, and do a batch insert to the database, instead of making 300 database inserts every 2 seconds.

Any idea how I can accomplish this?

+10  A: 

The canonical approach is to use a Callable and an ExecutorService. submitting a Callable to an ExecutorService returns a (typesafe) Future from which you can get the result.

class TaskAsCallable implements Callable<Result> {
    @Override
    public Result call() {
        return a new Result() // this is where the work is done.
    }
}

ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready

In your case, you probably want to use invokeAll which returns a List of Futures, or create that list yourself as you add tasks to the executor. To collect results, simply call get on each one.

daveb
Good post, +1 for the detail.
fastcodejava
+1  A: 

You need to store the result in a something like singleton. This has to be properly synchronized.

EDIT : I know this not the best advice as it is not good idea to handle raw Threads. But given the question this will work, won't it? I may not be upvoted, but why down vote?

fastcodejava
+1  A: 

You could create a queue or list which you pass to the threads you create, the threads add their result to the list which gets emptied by a consumer which performs the batch insert.

rsp
A: 

The simplest approach is to pass an object to each thread (one object per thread) that will contain the result later. The main thread should keep a reference to each result object. When all threads are joined, you can use the results.

AndiDog
+1  A: 
public class TopClass {
     List<User> users = new ArrayList<User>();
     void addUser(User user) {
         synchronized(users) {
             users.add(user);
         }
     }
     void store() throws SQLException {
        //storing code goes here
     }
     class MyThread extends Thread {
            MyService service;
            String id;
            public MyThread(String id) {
                this.id = node;
            }
            public void run() {
                User user = service.getUser(id)
                addUser(user);
            }
        }
}
Oso
+3  A: 

If you want to collect all of the results before doing the database update, you can use the invokeAll method. This takes care of the bookkeeping that would be required if you submit tasks one at a time, like daveb suggests.

private static final ExecutorService workers = Executors.newCachedThreadPool();

...

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>();
for (final String id : ids) {
  tasks.add(new Callable<User>()
  {

    public User call()
      throws Exception
    {
      return svc.getUser(id);
    }

  });
}
/* invokeAll blocks until all service requests complete, 
 * or a max of 10 seconds. */
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
for (Future<User> f : results) {
  User user = f.get();
  /* Add user to batch update. */
  ...
}
/* Commit batch. */
...
erickson
In my case, some of the service calls may never return or take too long to return. So 'invokeAll' along with 'awaitTermination(long timeout)' looks like way to go. Hence I am accepting this answer.Wish I could accept @daveb's answer too.
Langali
In that case, you can use an overloaded version of `invokeAll` that takes a timeout parameter. I'll update my answer to show how.
erickson
Thanks. I somehow overlooked it.
Langali
A: 

You could make a class which extends Observable. Then your thread can call a method in the Observable class which would notify any classes that registered in that observer by calling Observable.notifyObservers(Object).

The observing class would implement Observer, and register itself with the Observable. You would then implement an update(Observable, Object) method that gets called when Observerable.notifyObservers(Object) is called.

jonescb
+2  A: 

Store your result in your object. When it completes, have it drop itself into a synchronized collection (a synchronized queue comes to mind).

When you wish to collect your results to submit, grab everything from the queue and read your results from the objects. You might even have each object know how to "post" it's own results to the database, this way different classes can be submitted and all handled with the exact same tiny, elegant loop.

There are lots of tools in the JDK to help with this, but it is really easy once you start thinking of your thread as a true object and not just a bunch of crap around a "run" method. Once you start thinking of objects this way programming becomes much simpler and more satisfying.

Bill K
+1. My initial idea was similar, but Java 5 made it so much simpler now!
Langali
+1. That's beautiful.
Noel Ang
Did this for a system that pinged a class B address range. It was very elegant and cut the time down from hours to a few minutes. (Well, also changed from a Ping to an SNMP get, Java doesn't really support Ping)
Bill K