views:

184

answers:

1

I'd like to selectively delete messages from an AMQP queue without even reading them.

The scenario is as follows:

Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.

How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?

I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.

The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.

A: 

You cannot currently do this in RabbitMQ (or more generally, in AMQP) automatically. But, here's an easy workaround.

Let's say you want to send three types of messages: Xs, Ys and Zs. If I understand your question correctly, when an X message arrives, you want the broker to forget all other X messages that haven't been delivered.

This is fairly easy to do in RabbitMQ:

  • the producer declares three queues: X, Y, and Z (they're automatically bound to the default exchange with their names as routing keys, which is exactly what we want),
  • when publishing a message, the producer first purges the relevant queue (so, if it's publishing an X message, it first purges the X queue); this effectively removes the outdated messages,
  • the consumer simply consumes from the queue it wants (X for X messages, Y for Y messages, etc.); from its point of view, it just has to do a basic.get to get the next relevant message.

This implies a race condition when two producers send the same type of message at the about the same time. The result is that its possible for the a queue to have two (or more) messages at the same time, but since the number of messages is upper-bounded by the number of producers, and since the superfluous messages are purged on the next publish, this shouldn't be much of a problem.

To summarize, this solution has just one extra step from the optimal solution, namely purge queue X before publishing a message of type X.

If you need any help setting up this configuration, the perfect place to ask for advice is the rabbitmq-discuss mailing list.

scvalex
In my case the sole reason for using a queue is that X, Y, Z events are interleaved and subscriber should read them in the same order as they come - but only the latest X, latest Y and latest Z. Also, the number of types is order of thousands, so the subscriber won't listen on thousands of queues.
Wojciech Kaczmarek
Ah. Listening to thousands of queues wouldn't have been a problem (there's next to no overhead). You cannot really selectively delete messages once they're on a queue. It occurs to me that what you want is a fileserver that supports atomic move operations: publisher starts wrtiting into a temporary file, and when it's done it, moves (renames) that file to X; the client then just reads the file X.
scvalex
The whole point of using a queue is to get rid of polling on the subscriber's side, which would be necessary when using your suggested fileserver solution. Actually, I have sth like specialized fileserver already; what I need to do is to notify the client that changes occured and expire some of such notifications before client gets them.Well I can live with lack of such expiry mechanism and just tell the client that older notifications will be invalid. I remember selective removal was possible in MQSeries and thought it's doable in AMQP -- well it seems to be enough "complicated" protocol.
Wojciech Kaczmarek