tags:

views:

135

answers:

2

Is there a message queue implementation that allows breaking up work into 'batches' by inserting 'message barriers' into the message stream? Let me clarify. No messages after a message barrier should be delivered to any consumers of the queue, until all messages before the barrier are consumed. Sort of like a synchronization point. I'd also prefer if all consumers received notification when they reached a barrier.

Anything like this out there?

+1  A: 

I am not aware of existing, widely-available implementations, but if you'll allow me I'd propose a very simple, generic implementation using a proxy, where:

  • producers write to the proxy queue/topic
  • the proxy forwards to the original queue/topic until a barrier message is read by the proxy, at which point:
    • the proxy may notify topic subscribers of the barrier by forwarding the barrier message to the original topic, or
    • the proxy may notify queue subscribers of the barrier by:
      • periodically publishing barrier messages until the barrier has been cleared; this does not guarantee that all consumers will receive exactly one notification, although all will eventually clear the barrier (some may receive 0 notifications, others >1 notifications -- all depending on the type of scheduler used to distribute messages to consumers e.g. if non-roundrobin)
      • using a dedicated topic to notify each consumer exactly once per barrier
    • the proxy stops forwarding any messages from the proxy queue until the barrier has been cleared, that is, until the original queue has emptied and/or all consumers have acknowledged all queue/topic messages (if any) leading up to the barrier
  • the proxy resumes forwarding

UPDATE

Thanking Miklos for pointing out that under JMS the framework does not provide acknowledgements for asynchronous deliveries (what is referred to as "acknowledgements" in JMS are purely a consumer side concept and are not proxiable as-such.)

So, under JMS, the existing implementation (to be adapted for barriers) may already provide application-level acknowledgements via an "acknowledgement queue" (as opposed to the original queue -- which would be a "request queue".) The consumers would have to acknowledge execution of requests by sending acknowledgement messages to the proxy acknowledgement queue; the proxy would use the acknowledgement messages to track when the barrier has been cleared, after having also forwarded the acknowledgement messages to the producer.

If the existing implementation (to be adapted for barriers) does not already provide application-level acknowledgements via an "acknowledgement queue", then you could either:

  • have the proxy use the QueueBrowser, provided that:
    1. you are dealing with queueus not events, that
    2. you want to synchronize on delivery not acknowledgement of execution, and
    3. it is OK to synchronize on first delivery, even if the request was actually aborted and has to be re-delivered (even after the barrier has been cleared.) I think Miklos already pointed this problem out IIRC.
  • otherwise, add an acknowledgment queue consumed by the proxy, and adapt the consumers to write acknowledgements to it (essentially the JMS scenario above, except it is not necessary for the proxy to forward acknowledgement messages to the producer unless your producer needs the functionality.)

Cheers, V.

vladr
The producer can detect if there are un-delivered messages on the queue, using a Queue Browser, but the producer cannot detect when a message is acknowledged by a consumer. I think this problem can only be solved if there is a custom communication protocol between the proxy and the consumers.
Miklos
@Miklos, can you elaborate? I don't see a problem with either the proxy or producer getting the acknowledgments as long as both the proxy and the consumers are using the same acknowledgment mode and, e.g. in the case of the `CLIENT_ACKNOWLEDGE` mode, the proxy forwards the acknowledgements to the producer, too.
vladr
@Vlad, the acknowledgement is received and handled by the JMS server, not the message producer. In this sense the proxy is also a message producer. The JMS server is decoupling the consumer from the producer: the producer will not know what the consumer does, who it is, how many there are. This is a basic principle of asynchronous messaging.
Miklos
@Miklos: correct, I had not properly captured the concept of JMS message acknowledgement being purely a consumer side concept during my initial JMS documentation reading. I am updating the response accordingly.
vladr
A: 

You could achieve this using a topic for the 'Barrier Message' and a queue for the 'batched items' which are consumed with selective receivers.
Publishing the Barrier Message to a topic ensures that all consumers receive their own copy of the Barrier Message.

Each consumer will need two subscriptions:

  1. To the Barrier Topic
  2. A selective receiver against the batch queue, using selection criteria defined by the Barrier Message.

The Barrier Message will need to contain a batch key that must be applied to the queue consumers selection criteria.
e.g. batchId = n
or JMSMessageID < 100
or JMSTimestamp < xxx

Whenever a barrier message is received,

  • the current queue consumer must be closed
  • the queue selection criteria must be modified using the content of the Barrier Message
  • a new selective consumer must be started using the modified selection criteria

If you are going to use a custom batch key for the selection criteria such as 'batchId' above, then the assumption is that all message producers are capable of setting that JMS property or else a proxy will have to consume the messages set the property and republish to the queue where the selective consumers are listening.

For more info on selective receivers see these links:

  1. http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
  2. http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/QueueSession.html#createReceiver(javax.jms.Queue,%20java.lang.String)
crowne
And how do a consumer know when it has consumed all pending messages? So that the barrier can be removed (or a new barrier can be set, if you prefer). Except if you count how many messages have been produced and consumed (which is hard because of concurrency and transactions), there is not way to know that as per the JMS spec.
ewernli
@crowne, can you clarify for the readers whether or not there is a potential race condition just before a new selective consumer is started, i.e. is there any chance that any messages sent after the barrier, but before a new selective consumer is started, will be ignored by the new selective consumer?
vladr