views:

91

answers:

5

I have a C# app that subscribes to a topic on our messaging system for value updates. When a new value comes in, I do some processing and then carry on. The problem is, the updates can come faster than the app can process them. What I want to do is to just hold on to the latest value, so I don't want a queue. For example, the source publishes value "1" and my app receives it; while processing, the source publishes the sequence (2, 3, 4, 5) before my app is done processing; my app then processes value "5", with the prior values thrown away.

It's kind of hard to post a working code sample since it's based on proprietary messaging libraries, but I would think this is a common pattern, I just can't figure out what it's called...It seems like the processing function has to run on a separate thread than the messaging callback, but I'm not sure how to organize this, e.g. how that thread is notified of a value change. Any general tips on what I need to do?

A: 

This is not a "pattern", but you could use a shared data structure to hold the value. If there is only one value received from the messaging library, then a simple object would do. Otherwise you might be able to use a hashtable to store multiple message values (if required).

For example, on the message receive thread: when a message comes in, add/update the data structure with its value. On the thread side, you could periodically check this data structure to make sure you still have the same value. If you do not, then discard any processing you have already done and re-process with the new value.

Of course, you will need to ensure the data structure is properly synchronized between threads.

Justin Ethier
+1  A: 

Generally speaking one uses a messaging system to prevent losing messages. My initial reaction for a solution would be a thread to receive the inbound data which tries to pass it to your processing thread, if the processing thread is already running then you drop the data and wait for the next element and repeat.

Lazarus
A: 

Obviously the design of the messaging library can influence the best way to handle this problem. How I've done it in the past with somewhat similar functioning libraries, is I have a thread that listens for events, and places them into a Queue, and then I have Threadpool workers that dequeue the messages and process them.

You can read up on multithreading asyncronous job queues:

Mutlithreaded Job Queue

Work Queue Threading

NebuSoft
+2  A: 

A very simple way could be something like:

private IMessage _next;

public void ReceiveMessage(IMessage message)
{
    Interlocked.Exchange(ref _next, message);
}

public void Process()
{
    IMessage next = Interlocked.Exchange(ref _next, null);

    if (next != null)
    {
        //...
    }
}
winSharp93
+1, though if I understand the question correctly, the first method would be more of a receiver than a sender.
Jeff Sternal
I just changed it. Thanks!
winSharp93
thanks, I think I get the gist of it, but how is Process() invoked? does it have to run in some other thread in some sort of loop, or is there any way to invoke it upon receiving a new message, if it's not already processing a previous value?
toasteroven
You will need at least to threads: One thread receiving the messages and one processing them. <br /> In some cases the receiving thread could also be another process writing them into a database etc. <br />For notifying the processing thread you could use AutoResetEvents you set in ReceiveMessage.
winSharp93
A: 

A simple way is to use a member variable to hold the last value received, and wrap it with a lock. Another way is to push incoming values onto a stack. When you're ready for a new value, call Stack.Pop() and then Stack.Clear():

public static class Incoming
{
    private static object locker = new object();
    private static object lastMessage = null;

    public static object GetMessage()
    {
        lock (locker)
        {
            object tempMessage = lastMessage;
            lastMessage = null;
            return tempMessage;
        }
    }
    public static void SetMessage(object messageArg)
    {
        lock (locker)
        {
            lastMessage = messageArg;
        }
    }

    private static Stack<object> messageStack = new Stack<object>();
    public static object GetMessageStack()
    {
        lock (locker)
        {
            object tempMessage = messageStack.Count > 0 ? messageStack.Pop() : null;
            messageStack.Clear();
            return tempMessage;
        }
    }
    public static void SetMessageStack(object messageArg)
    {
        lock (locker)
        {
            messageStack.Push(messageArg);
        }
    }
}

Putting the processing functions on a separate thread is a good idea. Either use a callback method from the processing thread to signal that its ready for another message, or have it signal that it's done and then have the main thread start a new processor thread when a message is received (via the above SetMessage...).

ebpower