views:

370

answers:

5

I initially asked this question here, but I've realized that my question is not about a while-true loop. What I want to know is, what's the proper way to do high-performance asynchronous message-passing in Java?

What I'm trying to do...

I have ~10,000 consumers, each consuming messages from their private queues. I have one thread that's producing messages one by one and putting them in the correct consumer's queue. Each consumer loops indefinitely, checking for a message to appear in its queue and processing it.

I believe the term is "single-producer/single-consumer", since there's one producer, and each consumer only works on their private queue (multiple consumers never read from the same queue).

Inside Consumer.java:

@Override
public void run() {
    while (true) {
        Message msg = messageQueue.poll();
        if (msg != null) {
            ... // do something with the message
        }
    }
}

The Producer is putting messages inside Consumer message queues at a rapid pace (several million messages per second). Consumers should process these messages as fast as possible!

Note: the while (true) { ... } is terminated by a KILL message sent by the Producer as its last message.

However, my question is about the proper way to design this message-passing. What kind of queue should I use for messageQueue? Should it be synchronous or asynchronous? How should Message be designed? Should I use a while-true loop? Should Consumer be a thread, or something else? Will 10,000 threads slow down to a crawl? What's the alternative to threads?

So, what's the proper way to do high-performance message-passing in Java?

+2  A: 

I would say that the context switching overhead of 10,000 threads is going to be very high, not to mention the memory overhead. By default, on 32-bit platforms, each thread uses a default stack size of 256kb, so that's 2.5GB just for your stack. Obviously you're talking 64-bit but even so, that quite a large amount of memory. Due to the amount of memory used, the cache is going to be thrashing lots, and the cpu will be throttled by the memory bandwidth.

I would look for a design that avoids using so many threads to avoid allocating large amounts of stack and context switching overhead. You cannot process 10,000 threads concurrently. Current hardware has typically less than 100 cores.

I would create one queue per hardware thread and dispatch messages in a round-robin fashion. If the processing times vary considerably, there is the danger that some threads finish processing their queue before they are given more work, while other threads never get through their allotted work. This can be avoided by using work stealing, as implemented in the JSR-166 ForkJoin framework.

Since communication is one way from the publisher to the subscribers, then Message does not need any special design, assuming the subscriber doesn't change the message once it has been published.

EDIT: Reading the comments, if you have 10,000 symbols, then create a handful of generic subscriber threads (one subscriber thread per core), that asynchornously recieve messages from the publisher (e.g. via their message queue). The subscriber pulls the message from the queue, retrieves the symbol from the message, and looks this up in a Map of message handlers, retrieves the handler, and invokes the handler to synchronously handle the message. Once done, it repeats, fetching the next message from the queue. If messages for the same symbol have to be processed in order (which is why I'm guessing you wanted 10,000 queues.), you need to map symbols to subscribers. E.g. if there are 10 subscribers, then symbols 0-999 go to subscriber 0, 1000-1999 to subscriber 1 etc.. A more refined scheme is to map symbols according to their frequency distribution, so that each subscriber gets roughly the same load. For example, if 10% of the traffic is symbol 0, then subscriber 0 will deal with just that one symbol and the other symbols will be distributed amongst the other subscribers.

mdma
Is there a way to write my program so that conceptually it's 10,000 separate consumers, each working on their own queue? But running as a few threads dealing with a few queues?
Mr. Burgundy
Please see my edit.
mdma
@Mr.Burgundy Sure, there's lots of approaches. e.g. as a simple approach you could encapsulate the consumer logic in a class (not related to consumer thread) stuff 10k of them in a list, have _one_ consumer thread look up the proper one and invoke the logic for that particular consumer for the message.
nos
@nos - that's going to be single threaded. That will put a cap on the attainable performance when there are multiple cores available.
mdma
@mdma as said, that was the simple approach. If you're on a dual-core and your producer does as much work as the consumer that's all you need. The next slightly less simple approach is to spawn N consumer threads. Or keep the one consumer thread, and dispatch the work to a suitable configured `ExecutorService`, or lose to consumer thread and dispatch the work directly to an Executor
nos
A: 

Have a pool of consumer threads relative to the hardware and os capacity. These consumer threads could poll your message queue.

I would either have the Messages know how to process themselves or register processors with the consumer thread classes when they are initialized.

Jacob Tomaw
+1  A: 

First of all, there's no single correct answer unless you either put a complete design doc or you try different approaches for yourself.

I'm assuming your processing is not going to be computationally intensive otherwise you wouldn't be thinking of processing 10000 queues at the same time. One possible solution is to minimise context switching by having one-two threads per CPU. Unless your system is going to be processing data in strict real time that may possibly give you bigger delays on each queue but overall better throughput.

For example -- have your producer thread run on its own CPU and put batches of messages to consumer threads. Each consumer thread would then distribute messages to its N private queues, perform the processing step, receive new data batch and so on. Again, depends on your delay tolerance so the processing step may mean either processing all the queues, a fixed number of queues, as many queues it can unless a time threshold is reached. Being able to easily tell which queue belongs to which consumer thread (e.g. if queues are numbered sequentially: int consumerThreadNum = queueNum & 0x03) would be beneficial as looking them up in a hash table each time may be slow.

To minimise memory thrashing it may not be such a good idea to create/destroy queues all the time so you may want to pre-allocate a (max number of queues/number of cores) queue objects per thread. When a queue is finished instead of being destroyed it can be cleared and reused. You don't want gc to get in your way too often and for too long.

Another unknown is if your producer produces complete sets of data for each queue or will send data in chunks until the KILL command is received. If your producer sends complete data sets you may do away with the queue concept completely and just process the data as it arrives to a consumer thread.

Gilead
A: 

In the absence of more detail about the constraints of processing the symbols, its hard to give very specific advice.

You should take a look at this slashdot article:

http://developers.slashdot.org/story/10/07/27/1925209/Java-IO-Faster-Than-NIO

It has quite a bit of discussions and actual measured data about the many thread vs. single select vs. thread pool arguments.

James Branigan
+1  A: 

You could use this (credit goes to http://stackoverflow.com/questions/3253505/which-threadpool-in-java-should-i-use/3253936#3253936):

class Main {
    ExecutorService threadPool = Executors.newFixedThreadPool(
                                     Runtime.availableProcessors()*2);

    public static void main(String[] args){
        Set<Consumer> consumers = getConsumers(threadPool);
        for(Consumer consumer : consumers){
            threadPool.execute(consumer);
        }
    }
}

and

class Consumer {
    private final ExecutorService tp;
    private final MessageQueue messageQueue;
    Consumer(ExecutorService tp,MessageQueue queue){
        this.tp = tp;
        this.messageQueue = queue;
    }
    @Override
    public void run(){
              Message msg = messageQueue.poll();

              if (msg != null) {
                  try{
                       ... // do something with the message
                  finally{
                       this.tp.execute(this);
                  }
              }
           }
    }
}    

This way, you can have okay scheduling with very little hassle.

Enno Shioji