tags:

views:

1529

answers:

2

I have a WCF service that posts messages to a private, non-transactional MSMQ queue. I have another WCF service (multi-threaded) that processes the MSMQ messages and inserts them in the database.

My issue is with sequencing. I want the messages to be in certain order. For example MSG-A need to go to the database before MSG-B is inserted. So my current solution for that is very crude and expensive from database perspective.

I am reading the message, if its MSG-B and there is no MSG-A in the database, I throw it back on the message queue and I keep doing that till MSG-A is inserted in the database. But this is a very expensive operation as it involves table scan (SELECT stmt).

The messages are always posted to the queue in sequence.

Short of making my WCF Queue Processing service Single threaded (By setting the service behavior attribute InstanceContextMode to Single), can someone suggest a better solution?

Thanks

Dan

+1  A: 

Instead of immediately pushing messages to the DB after taking them out of the queue, keep a list of pending messages in memory. When you get an A or B, check to see if the matching one is in the list. If so, submit them both (in the right order) to the database, and remove the matching one from the list. Otherwise, just add the new message to that list.

If checking for a match is too expensive a task to serialize - I assume you are multithreading for a reason - the you could have another thread process the list. The existing multiple threads read, immediately submit most messages to the DB, but put the As and Bs aside in the (threadsafe) list. The background thread scavenges through that list finding matching As and Bs and when it finds them it submits them in the right order (and removes them from the list).

The bottom line is - since your removing items from the queue with multiple threads, you're going to have to serialize somewhere, in order to ensure ordering. The trick is to minimize the number of times and length of time you spend locked up in serial code.

There might also be something you could do at the database level, with triggers or something, to reorder the entries when it detects this situation. I'm afraid I don't know enough about DB programming to help there.

UPDATE: Assuming the messages contain some id that lets you associate a message 'A' with the correct associated message 'B', the following code will make sure A goes in the database before B. Note that it does not make sure they are adjacent records in the database - there could be other messages between A and B. Also, if for some reason you get an A or B without ever receiving the matching message of the other type, this code will leak memory since it hangs onto the unmatched message forever.

(You could extract those two 'lock'ed blocks into a single subroutine, but I'm leaving it like this for clarity with respect to A and B.)

static private object dictionaryLock = new object();
static private Dictionary<int, MyMessage> receivedA = 
    new Dictionary<int, MyMessage>();
static private Dictionary<int, MyMessage> receivedB = 
    new Dictionary<int, MyMessage>();

public void MessageHandler(MyMessage message)
{
    MyMessage matchingMessage = null;
    if (IsA(message))
    {
        InsertIntoDB(message);
        lock (dictionaryLock)
        {
            if (receivedB.TryGetValue(message.id, out matchingMessage))
            {
                receivedB.Remove(message.id);
            }
            else
            {
                receivedA.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(matchingMessage);
        }
    }
    else if (IsB(message))
    {
        lock (dictionaryLock)
        {
            if (receivedA.TryGetValue(message.id, out matchingMessage))
            {
                receivedA.Remove(message.id);
            }
            else
            {
                receivedB.Add(message.id, message);
            }
        }
        if (matchingMessage != null)
        {
            InsertIntoDB(message);
        }
    }
    else
    {
        // not A or B, do whatever
    }
}
Bruce
Thanks for the response Bruce. The queue I currently have is non-transactional. Sorry for the confusion. I do wanna lookups to the DB as that can end up being very expensive. I am, however, going to try serializing it in memory. But like you pointed out, it is going to be tricky as multiple threads are going to try to access the data pulled from the queue. I was thinking of a Static Dictionary where the key can be a uniqueidentifier that I use to identify messages. Do you see any concern with the Dictionary? Is there any better way to manage this list?
Check out the thread safe structures in the 4.0 Framework. If you are not comfortable shipping on a beta it will at least give you a point in the right direction.
Adam Fyles
Static dictionary is not thread-safe. I'll update my answer with some sample code.
Bruce
Before I try writing code, can you clarify one point? Is your thought that the message contains a unique identifier, and that A and B both have the same identifier? So if you receive messsages A1, A2, A3 and B1, B2, B3 you know that A1 goes with B1, etc? I hope that is the case, because with multiple threads receiving messages at the same time, I don't know how else you'll know which A goes with which B.
Bruce
+1  A: 

Hi Dan,

If you're the only client of those queues, you could very easy add a timestamp as a message header (see IDesign sample) and save the Sent On field (kinda like an outlook message) in the database as well. You could process them in the order they were sent (basically you move the sorting logic at the time of consumption).

Hope this helps, Adrian

Adrian Stoica