views:

331

answers:

3

We have an issue in ActiveMQ where we have a huge number of messages not dropping off topics. The topics are set to non-persistent, non-durable. Our Activemq.xml file is

<beans>

  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false" persistent="false">

<!--
    <persistenceAdapter>
      <journaledJDBC journalLogFiles="5" dataDirectory="../data"/>
    </persistenceAdapter>
-->

        <transportConnectors>
            <transportConnector uri="vm://localhost"/>
        </transportConnectors>

  </broker>

</beans>

and our topic definition in messaging-config.xml is

<destination id="traceChannel">

    <properties>

        <network>
        <session-timeout>10</session-timeout>
    </network>

        <server>
            <message-time-to-live>10000</message-time-to-live>
            <durable>false</durable>
            <durable-store-manager>flex.messaging.durability.FileStoreManager</durable-store-manager>
        </server>

        <jms>
            <destination-type>Topic</destination-type>
            <message-type>javax.jms.ObjectMessage</message-type>
            <connection-factory>ConnectionFactory</connection-factory>
            <destination-jndi-name>dynamicTopics/traceTopic</destination-jndi-name>
            <delivery-mode>NON_PERSISTENT</delivery-mode>
            <message-priority>DEFAULT_PRIORITY</message-priority>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <transacted-sessions>false</transacted-sessions>
            <initial-context-environment>
                <property>
                    <name>Context.INITIAL_CONTEXT_FACTORY</name>
                    <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
                </property>
                <property>
                    <name>Context.PROVIDER_URL</name>
                    <value>tcp://localhost:61616</value>
                </property>
            </initial-context-environment>
        </jms>
    </properties>

    <channels>
        <channel ref="rtmps" />
    </channels>

    <adapter ref="trace" />

</destination>

What I am trying to achieve is that only the last 10 messages be on the topics at any one time as leaving it running overnight results in over 150K messages on the topic even though it should only be holding a very small number.

+1  A: 

I'm not sure that what you want can be archived. You can do several things:

First of all you can put a time to live to your messages:

public ITopicPublisher CreateTopicPublisher(string selector)
        {
            try
            {
                IMessageProducer producer = m_session.CreateProducer(m_topic);
                ActiveMQPublisher publisher = new ActiveMQPublisher(producer);

                // here we put a time to live to 1min for eg
                TimeSpan messageTTL = TimeSpan.FromMilliseconds(60000);
                publisher.TimeToLive = messageTTL;

                if (!String.IsNullOrEmpty(selector))
                {
                    publisher.IsSelector = true;
                    publisher.Selector = selector;
                }
                return publisher;
            }
            catch (Exception ex)
            {
                Logger.Exception(ex);
                throw;
            }
        }

You will still get the 150k messages in the morning, but as soon as one consumer connects to your topic the expired messages will be gone.

You might also want to have a look at the eviction strategy (Eviction Strategy)

Edit: I just realized one thing. You say "leaving it running overnight results in over 150K messages on the topic even though it should only be holding a very small number." In a Topic you don't have the concept of messages which have to be picked up. If nobody is subscribed to a topic the message sent there is dropped. Still the "Message enqueued" will increase by one. If you only want "the last 10 messages" sent, maybe you should use a queue instead of a topic?

Srodriguez
A: 

Do not know if it works, haven-t got the chance yet to test it, but please take a look at constantPendingMessageLimitStrategy in here http://activemq.apache.org/slow-consumer-handling.html

Good luck Claudio

Claudio Fernandez
+2  A: 

As far as I know, messages sent to non-durable topic with no subscribers should be dropped. Only currently registered consumers will get the message copy.

How do you check that topic contains these 150K messages? Via JMX?

Regardless the fact that your non-durable topic shouldn't cache these 150K messages you can limit amount of message stored per consumer using the broker policy:

<broker>
...    
  <pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="10"/>
  </pendingMessageLimitStrategy>
...
</broker>
Henryk Konsek