views:

389

answers:

4

We have a MSMQ Queue setup that receives messages and is processed by an application. We'd like to have another process subscribe to the Queue and just read the message and log it's contents.

I have this in place already, the problem is it's constantly peeking the queue. CPU on the server when this is running is around 40%. The mqsvc.exe runs at 30% and this app runs at 10%. I'd rather have something that just waits for a message to come in, get's notified of it, and then logs it without constantly polling the server.

    Dim lastid As String
    Dim objQueue As MessageQueue
    Dim strQueueName As String

    Public Sub Main()
        objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
        Dim propertyFilter As New MessagePropertyFilter
        propertyFilter.ArrivedTime = True
        propertyFilter.Body = True
        propertyFilter.Id = True
        propertyFilter.LookupId = True
        objQueue.MessageReadPropertyFilter = propertyFilter
        objQueue.Formatter = New ActiveXMessageFormatter
        AddHandler objQueue.PeekCompleted, AddressOf MessageFound

        objQueue.BeginPeek()
    end main

    Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)

        Dim oQueue As MessageQueue
        Dim oMessage As Message

        ' Retrieve the queue from which the message originated
        oQueue = CType(s, MessageQueue)

            oMessage = oQueue.EndPeek(args.AsyncResult)
            If oMessage.LookupId <> lastid Then
                ' Process the message here
                lastid = oMessage.LookupId
                ' let's write it out
                log.write(oMessage)
            End If

        objQueue.BeginPeek()
    End Sub
+2  A: 

A Thread.Sleep(10) in between peek iterations may save you a bunch of cycles.

The only other option I can think of is to build the logging into the queue reading application.

StingyJack
My only problem with that is there is another process reading the messages out of the queue at the same time. If i slept and something else removed the message from the queue wouldn't I miss it?
Paul Lemke
@lemkepf, even without the sleep you are not guaranteed to see every message posted to the queue. Your logging process will not ALWAYS be executing, and even if it were, a message could be removed from the queue while you were still logging the last message.
ScottS
This is the solution I went with. It dropped CPU from 30% to about 4%. If i miss a few here and there it's not a big deal. Also, the process that pulls the messages out of the queue takes about 1 minute to accomplish it's task, so waiting 10 milliseconds was fine. Thanks.
Paul Lemke
A: 

There's no API that will let you peek at each message only once.

The problem is that BeginPeek executes its callback immediately if there's already a message on the queue. Since you aren't removing the message (this is peek after all, not receive!), when your callback begins peeking again the process starts over, so MessageFound runs almost constantly.

Your best options are to log the messages in the writer or the reader. Journaling will work for short periods (if you only care about messages that are received), but aren't a long-term solution:

While the performance overhead of retrieving messages from a queue that is configured for Journaling is only about 20% more than retrieving messages without Journaling, the real cost is unexpected problems caused when an unchecked MSMQ service runs out of memory or the machine is out of disk space

Jeff Sternal
A: 

This works for me. It blocks the thread while waiting for a message. Each loop cycle checks the class member _bServiceRunning to see if the thread should abort.

    private void ProcessMessageQueue(MessageQueue taskQueue)
    {
        // Set the formatter to indicate body contains a binary message:
        taskQueue.Formatter = new BinaryMessageFormatter();

        // Specify to retrieve selected properties.
        MessagePropertyFilter myFilter = new MessagePropertyFilter();
        myFilter.SetAll();
        taskQueue.MessageReadPropertyFilter = myFilter;

        TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds

        // Monitor the MSMQ until the service is stopped:
        while (_bServiceRunning)
        {
            rxMessage = null;

            // Listen to the queue for the configured duration:
            try
            {
                // See if a message is available, and if so remove if from the queue if any required
                // web service is available:
                taskQueue.Peek(tsQueueReceiveTimeout);

                // If an IOTimeout was not thrown, there is a message in the queue
                // Get all the messages; this does not remove any messages
                Message[] arrMessages = taskQueue.GetAllMessages();

                // TODO: process the message objects here;
                //       they are copies of the messages in the queue
                //       Note that subsequent calls will return the same messages if they are
                //       still on the queue, so use some structure defined in an outer block
                //       to identify messages already processed.

            }
            catch (MessageQueueException mqe)
            {
                if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                {
                    // The peek message time-out has expired; there are no messages waiting in the queue
                    continue; // at "while (_bServiceRunning)"
                }
                else
                {
                    ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
                    break; // from "while (_bServiceRunning)"
                }
            }
            catch (Exception ex)
            {
                ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
                break; // from "while (_bServiceRunning)"
            }
        }

    } // ProcessMessageQueue()
Simon Chadwick
I think the original poster is trying to avoid calling `Receive` since there's another process that's doing that.
Jeff Sternal
@Jeff: Thanks for the pointer. I updated the code sample to use the MessageQueue.GetAllMessages() method, which reads (but does not extract) all the messages in a queue. Note that this would not scale well if the queue becomes loaded with a large number of messages. He could add a dynamically adjusting Sleep() delay timer in the loop, based on the number of messages found in the previous GetAllMessages() call.
Simon Chadwick
This polling approach is the same in spirit as the code in the question. If there is another process removing messages from the queue, neither solution is assured to see all the messages that pass through the queue. In many situations this approach will catch "most" messages but you'll never know when one slips by. Logging when sending or receiving the messages is the best way to catch them all.
ScottS
+1  A: 

Have you tried using MSMQEvent.Arrived to track the messages?

The Arrived event of the MSMQEvent object is fired when the MSMQQueue.EnableNotification method of an instance of the MSMQQueue object representing an open queue has been called and a message is found or arrives at the applicable position in the queue.

Stuart Dunkeld