views:

145

answers:

0

We have an ActiveMQ topic into which a Java program inserts messages, where a C# client retrieves them. This has been working fine for a long time, with most of our messages around 1 - 2K. Recently we had a message closer to 10K (9,910 bytes); we take the message, wrap it with some XML, and send it to the topic.

When the C# application pulls the messages, it has been working fine. For this one message, it retrieves a message with no content.

I can see nothing in the configuration files or elsewhere that would enforce a limit; the <systemUsage> parameter is commented out in the config files. The code works fine for the other messages.

The debug statement prior to enqueueing shows a length for the "MessageContent" of 10366. When dequeueing, all properties are fine. The body contains a single key "MessageContent", but its value is an empty string.

The code to enqueue is:

try
{
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageProducer producer = null;
    Topic topic = session.createTopic(getSubject());
    producer = session.createProducer(topic);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    MapMessage mapMessage = session.createMapMessage();
    if(properties!=null && !properties.isEmpty())
    {
        java.util.Iterator keyIter = properties.keySet().iterator();
        while(keyIter.hasNext())
        {
            String key = keyIter.next();
            mapMessage.setStringProperty(key, properties.get(key));
        }
    }
    mapMessage.setString("MessageContent", xmlCommand);
    logger.info("Message content length=" + mapMessage.getString("MessageContent").length());
    producer.send(mapMessage);
}

To dequeue:

{
    IConnectionFactory factory = new ConnectionFactory(_msgBusURL);
    IConnection conn = factory.CreateConnection();
    conn.ExceptionListener += new ExceptionListener(conn_ExceptionListener);
    ISession session = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
    IDestination dest = session.GetTopic(TOPICTYPE);
    IMessageConsumer receiver = session.CreateConsumer(dest);
    receiver.Listener += new MessageListener(receiver_Listener);
    conn.Start();
}

private void receiver_Listener(IMessage message)
{
    IMapMessage mapMessage = message as IMapMessage;
    if (mapMessage != null)
    {
        string content = mapMessage.Body.GetString("MessageContent");
        Logger.Info("Body count=" + mapMessage.Body.Count);
        Logger.Info("Prop count=" + mapMessage.Properties.Count);
        foreach (string curkey in mapMessage.Body.Keys)
        {
            Logger.Info("Body Key=" + curkey + ", value=" + mapMessage.Body[curkey]);
        }
        foreach (string curkey in mapMessage.Properties.Keys)
        {
            Logger.Info("Prop Key=" + curkey + ", value=" + mapMessage.Properties[curkey]);
        }
    }
}