tags:

views:

570

answers:

2

I can't seem to find a way to listen for new producer and consumer connections (or connection interrupts) in ActiveMQ (Java Version). I want to be able to tell the consumers (or they can find out themselves) that the producer's connection dropped. The other way around (the producer finding out that a certain consumer disconnected) is also required.

I'd appreciate some help.

+3  A: 

I think you want to listen for new producers and consumers on a particular destination (a particular queue or topic). Is that right?

You can instantiate ConsumerEventSource and ProducerEventSource, and register your own listeners by calling their setConsumerListener and setProducerListener on them, respectively.

So:

Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {

   public void onConsumerEvent(ConsumerEvent event) {
      if (event.isStarted()) {
         System.out.println("a new consumer has started - " + event.getConsumerId());
      } else {
         System.out.println("a consumer has dropped - " + event.getConsumerId());
      }
   }

});

If you look at the code for ConsumerEventSource or ProducerEventSource, you'll see that they're simple objects that use the methods of AdvisorySupport to listen on a special advisory topic whose business it is to broadcast news about producers and consumers. You might learn more by reading the source code for those classes.

Your use of "connection" is potentially a problem; in ActiveMQ land (which is a subset of JMS land), a "Connection" is a lower-level object that isn't associated with a particular destination. A particular client creates a "Session" from a Connection - still not specific to a destination - and then creates a destination-specific QueueSender, QueueReceiver, TopicPublisher, or TopicSubscriber. When those are created, or when the sessions that created them die, those are the events you want to hear about, and will hear about if you use the code above.

Ladlestein
Thanks a lot! That's exactly what I meant. I also tried to apply this code to a Stomp connection (failover://stomp://localhost:61613) but unfortunately it doesn't work. I think it's because Advisory messages are empty when received through a Stomp subscription (https://issues.apache.org/activemq/browse/AMQ-2098). If, however, you know any workaround, please let me know.
Tiago Alves
A: 

All the information I need is published in the ActiveMQ Advisory topics such as "ActiveMQ.Advisory.Connection" or simply "ActiveMQ.Advisory..>" for all of them. Even the events that happen in a Stomp Connection are published in the ActiveMQ Advisory topics. The following code gives an example of this (tested with a Flex Client connected through Stomp):

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {

            if (message instanceof ActiveMQMessage) {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                Object command = activeMessage.getDataStructure();
                if (command instanceof ConsumerInfo) {
                    System.out.println("A consumer subscribed to a topic or queue: " + command);
                } else if (command instanceof RemoveInfo) {
                    RemoveInfo removeInfo = (RemoveInfo) command;
                    if (removeInfo.isConsumerRemove()) {
                        System.out.println("A consumer unsubscribed from a topic or queue");
                    }
                    else {
                        System.out.println("RemoveInfo, a connection was closed: " + command);
                    }
                } else if (command instanceof ConnectionInfo) {
                    System.out.println("ConnectionInfo, a new connection was made: " + command);
                } else {
                    System.out.println("Unknown command: " + command);
                }
            }
    }
});
Tiago Alves