views:

540

answers:

1

I'm using py-amqplib to access RabbitMQ in Python. The application receives requests to listen on certain MQ topics from time to time.

The first time it receives such a request it creates an AMQP connection and a channel and starts a new thread to listen for messages:

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
    channel = connection.channel()

    listener = AMQPListener(channel)
    listener.start()

AMQPListener is very simple:

class AMQPListener(threading.Thread):
    def __init__(self, channel):
        threading.Thread.__init__(self)
        self.__channel = channel

    def run(self):
        while True:
            self.__channel.wait()

After creating the connection it subscribes to the topic of interest, like this:

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
    self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

The first time this all works fine. However, it fails on a subsequent request to subscribe to another topic. On subsequent requests I re-use the AMQP connection and AMQPListener thread (since I don't want to start a new thread for each topic) and when I call the code block above the channel.queue_declare() method call never returns. I've also tried creating a new channel at that point and the connection.channel() call never returns, either.

The only way I've been able to get it to work is to create a new connection, channel and listener thread per topic (ie. routing_key), but this is really not ideal. I suspect it's the wait() method that's somehow blocking the entire connection, but I'm not sure what to do about it. Surely I should be able to receive messages with several routing keys (or even on several channels) using a single listener thread?

A related question is: how do I stop the listener thread when that topic is no longer of interest? The channel.wait() call appears to block forever if there are no messages. The only way I can think of is to send a dummy message to the queue that would "poison" it, ie. be interpreted by the listener as a signal to stop.

A: 

If you want more than one comsumer per channel just attach another one using basic_consume() and use channel.wait() after. It will listen to all queues attached via basic_consume(). Make sure you define different consumer tags for each basic_consume().

Use channel.basic_cancel(consumer_tag) if you want to cancel a specific consumer on a queue (cancelling listen to a specific topic).

Torsten
Thanks, but the problem is I may need to subscribe to new topics at any time, ie. after `channel.wait()` has been called.
Evgeny