views:

964

answers:

2

Im trying to move a system from using morbid to rabbitmq, but I cannot seem to get the same broadcast behaviour morbid supplied by default. By broadcast I mean that when a message is added to the queue, every consumer recieves it. With rabbit, when a message is added they are distributed round robin style to every listener.

Can anyone tell me how to achieve the same kind of message distribution?

The stomp library used below is http://code.google.com/p/stomppy/

Failing being able to do with with stomp, even a amqplib example would really help.

My code at present looks like this

The Consumer

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demoqueue', ack='auto')

while True:
    pass
conn.disconnect()

And the sender looks like this

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demotopic', ack='auto')

while True:
    pass
conn.disconnect()
+3  A: 

Apparently you can't do with directly with STOMP; there is a mailing list thread that shows all the hoops you have to jump through to get broadcast working with stomp (it involves some lower-level AMPQ stuff).

Rick Copeland
Thanks, I've seen the thread before, and tried to implement its suggestions with amqplib without success. The specific message that touches on it is http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2008-September/001786.html. I've updated the question to reflect that a amqplib sample would do the job for me.
Sean O Donnell
+3  A: 

I finally figured out how to do it by creating an exchange for each "recieving group", im not sure how well rabbit will do with thousands of exchanges, so you might want to figure test this heavily before trying it in production

In the sending code:

conn.send(str(i), exchange=exchange, destination='')

The blank destination is required, all I care about is sending to that exchange

To recieve

import stomp
import sys
from amqplib import client_0_8 as amqp
#read in the exchange name so I can set up multiple recievers for different exchanges to tset
exchange = sys.argv[1]
conn = amqp.Connection(host="localhost:5672", userid="username", password="password",
 virtual_host="/", insist=False)

chan = conn.channel()

chan.access_request('/', active=True, write=True, read=True)

#declare my exchange
chan.exchange_declare(exchange, 'topic')
#not passing a queue name means I get a new unique one back
qname,_,_ = chan.queue_declare()
#bind the queue to the exchange
chan.queue_bind(qname, exchange=exchange)

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="username", password="password")
headers = {}

#subscribe to the queue
conn.subscribe(destination=qname, ack='auto')

while True:
    pass
conn.disconnect()
Sean O Donnell