views:

282

answers:

1

The collection aggregator used in the Mule 2.0 framework works a bit like this:

  • An inbound router takes a collection of messages and splits it up into a number of smaller messages - each smaller message get stamped with a correlation id corresponding to the parent message

  • These messages flow through various services

  • Finally these messages arrive at an inbound aggregator that collects up the messages based on the correlation id of the parent message and the number of expected messages. Once all of the expected messages have been received then the aggregation function is called and the result is returned.

Now this works fine when the number of messages in a group is reasonably small. However once the number of messages in a group becomes huge ~100k then a lot of memory is tied up holding onto the group of messages waiting for the later messages to arrive. This is made worse if there are multiple groups being aggregated at the same time.

A way around this issue would be to implement a streaming aggregator. In my use case I am essentially summing up the various messages based on a key and this could be done without having to see all of the messages in the group at the same time. I'd only want to know that all of the messages had been received before forwarding the result onto the endpoint.

Does this sound like a reasonable solution to the problem?

Is this already implemented somewhere in Mule?

Are there better ways of doing this?

+1  A: 

This seems like a reasonable approach (I'm not a Mule expert by any means), and I have read all of the Mule documentation and don't think there is something like this out there (the streaming support is limited to a few connectors and transformers - it's pretty simple in that it just passes around an InputStream). Only a few things in Mule stream, so you may need to have other modified transformers (if you use them) that stream. You would just implement the aggregator the provides an InputStream and starts streaming as soon as it got some consecutive sequence of messages.

However one sentence in your description "... all of the messages had been received before forwarding the results to the endpoint" could be troubling. This by it's very nature defeats the purpose of streaming, unless you mean that you (in your service component presumably) will keep track that you got everything before forwarding the (presumably much smaller) processed result onwards.

Francis Upton