tags:

views:

133

answers:

3

I'm using a subscriber/notifier pattern to raise and consume events from my .Net middle-tier in C#. Some of the events are raised in "bursts", for instance, when data is persisted from a batch program importing a file. This executes a potentially long-running task, and I'd like to avoid firing the event several times a second by implementing a "quiet period", whereby the event system waits until the event stream slows down to process the event.

How should I do this when the Publisher takes an active role in notifying subscribers? I don't want to wait until an event comes in to check to see if there are others waiting out the quiet period...

There is no host process to poll the subscription model at the moment. Should I abandon the publish/subscribe pattern or is there a better way?

A: 

I am not sure if I understood your question correctly, but I would try to fix the problem at source - make sure the events are not raised in "bursts". You could consider implementing batch operations, which could be used from the file importing program. This would be treated as a single event in your middletier and raise a single event.

I think it will be very tricky to implement some reasonable solution if you can't make the change outlined above - you could try to wrap your publisher in a "caching" publisher, which would implement some heuristic to cache the events if they are coming in bursts. The easiest would be to cache an event if another one of the same type is being currently processed (so your batch would cause at least 2 events - one at the very beginning, and one at the end). You could wait for a short time and only raise an event when the next one hasn't come during that time, but you get a time lag even if there is a single event in the pipeline. You also need to make sure you will raise the event from time to time even if there is constant queue of events - otherwise the publishers will potentially get starved.

The second option is tricky to implement and will contain heuristics, which might go very wrong...

Grzenio
I want the notifying process to be able to raise events very often, but have the subscriber decide whether to care about the events itself. One subscriber might want twenty events per second, another might want to be notified less frequently...
Chris McCall
Ah ok, I understand now. If you want to use publish/subscribe pattern then I think the subscribers will need to implement caching of the events. You could consider creating different types of events, e.g. one that will fire for every single element, the other which will fire once for a whole batch. Otherwise I think I would implement caching of events inside each of the subscribers.
Grzenio
Arguably, `frequency` can be a property of the subscription, forcing the publisher to send out notifications at different intervals for different subscribers.
Steven Sudit
+1  A: 

Here's a rough implementation that might point you in a direction. In my example, the task that involves notification is saving a data object. When an object is saved, the Saved event is raised. In addition to a simple Save method, I've implemented BeginSave and EndSave methods as well as an overload of Save that works with those two for batch saves. When EndSave is called, a single BatchSaved event is fired.

Obviously, you can alter this to suit your needs. In my example, I kept track of a list of all objects that were saved during a batch operation, but this may not be something that you'd need to do...you may only care about how many objects were saved or even simply that a batch save operation was completed. If you anticipate a large number of objects being saved, then storing them in a list as in my example may become a memory issue.

EDIT: I added a "threshold" concept to my example that attempts to prevent a large number of objects being held in memory. This causes the BatchSaved event to fire more frequently, though. I also added some locking to address potential thread safety, though I may have missed something there.

class DataConcierge<T>
{
    // *************************
    // Simple save functionality
    // *************************

    public void Save(T dataObject)
    {
        // perform save logic

        this.OnSaved(dataObject);
    }

    public event DataObjectSaved<T> Saved;

    protected void OnSaved(T dataObject)
    {
        var saved = this.Saved;
        if (saved != null)
            saved(this, new DataObjectEventArgs<T>(dataObject));
    }

    // ************************
    // Batch save functionality
    // ************************

    Dictionary<BatchToken, List<T>> _BatchSavedDataObjects = new Dictionary<BatchToken, List<T>>();
    System.Threading.ReaderWriterLockSlim _BatchSavedDataObjectsLock = new System.Threading.ReaderWriterLockSlim();

    int _SavedObjectThreshold = 17; // if the number of objects being stored for a batch reaches this threshold, then those objects are to be cleared from the list.

    public BatchToken BeginSave()
    {
        // create a batch token to represent this batch
        BatchToken token = new BatchToken();

        _BatchSavedDataObjectsLock.EnterWriteLock();
        try
        {
            _BatchSavedDataObjects.Add(token, new List<T>());
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitWriteLock();
        }
        return token;
    }

    public void EndSave(BatchToken token)
    {
        List<T> batchSavedDataObjects;
        _BatchSavedDataObjectsLock.EnterWriteLock();
        try
        {
            if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");

            this.OnBatchSaved(batchSavedDataObjects); // this causes a single BatchSaved event to be fired

            if (!_BatchSavedDataObjects.Remove(token))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitWriteLock();
        }
    }

    public void Save(BatchToken token, T dataObject)
    {
        List<T> batchSavedDataObjects;
        // the read lock prevents EndSave from executing before this Save method has a chance to finish executing
        _BatchSavedDataObjectsLock.EnterReadLock();
        try
        {
            if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");

            // perform save logic

            this.OnBatchSaved(batchSavedDataObjects, dataObject);
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitReadLock();
        }
    }

    public event BatchDataObjectSaved<T> BatchSaved;

    protected void OnBatchSaved(List<T> batchSavedDataObjects)
    {
        lock (batchSavedDataObjects)
        {
            var batchSaved = this.BatchSaved;
            if (batchSaved != null)
                batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects));
        }
    }

    protected void OnBatchSaved(List<T> batchSavedDataObjects, T savedDataObject)
    {
        // add the data object to the list storing the data objects that have been saved for this batch
        lock (batchSavedDataObjects)
        {
            batchSavedDataObjects.Add(savedDataObject);

            // if the threshold has been reached
            if (_SavedObjectThreshold > 0 && batchSavedDataObjects.Count >= _SavedObjectThreshold)
            {
                // then raise the BatchSaved event with the data objects that we currently have
                var batchSaved = this.BatchSaved;
                if (batchSaved != null)
                    batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects.ToArray()));

                // and clear the list to ensure that we are not holding on to the data objects unnecessarily
                batchSavedDataObjects.Clear();
            }
        }
    }
}

class BatchToken
{
    static int _LastId = 0;
    static object _IdLock = new object();

    static int GetNextId()
    {
        lock (_IdLock)
        {
            return ++_LastId;
        }
    }

    public BatchToken()
    {
        this.Id = GetNextId();
    }

    public int Id { get; private set; }
}

class DataObjectEventArgs<T> : EventArgs
{
    public T DataObject { get; private set; }

    public DataObjectEventArgs(T dataObject)
    {
        this.DataObject = dataObject;
    }
}

delegate void DataObjectSaved<T>(object sender, DataObjectEventArgs<T> e);

class BatchDataObjectEventArgs<T> : EventArgs
{
    public IEnumerable<T> DataObjects { get; private set; }

    public BatchDataObjectEventArgs(IEnumerable<T> dataObjects)
    {
        this.DataObjects = dataObjects;
    }
}

delegate void BatchDataObjectSaved<T>(object sender, BatchDataObjectEventArgs<T> e);

In my example, I choose to use a token concept in order to create separate batches. This allows smaller batch operations running on separate threads to complete and raise events without waiting for a larger batch operation to complete.

I made separete events: Saved and BatchSaved. However, these could just as easily be consolidated into a single event.

EDIT: fixed race conditions pointed out by Steven Sudit on accessing the event delegates.

EDIT: revised locking code in my example to use ReaderWriterLockSlim rather than Monitor (i.e. the "lock" statement). I think there were a couple of race conditions, such as between the Save and EndSave methods. It was possible for EndSave to execute, causing the list of data objects to be removed from the dictionary. If the Save method was executing at the same time on another thread, it would be possible for a data object to be added to that list, even though it had already been removed from the dictionary.

In my revised example, this situation can't happen and the Save method will throw an exception if it executes after EndSave. These race conditions were caused primarily by me trying to avoid what I thought was unnecessary locking. I realized that more code needed to be within a lock, but decided to use ReaderWriterLockSlim instead of Monitor because I only wanted to prevent Save and EndSave from executing at the same time; there wasn't a need to prevent multiple threads from executing Save at the same time. Note that Monitor is still used to synchronize access to the specific list of data objects retrieved from the dictionary.

EDIT: added usage example

Below is a usage example for the above sample code.

    static void DataConcierge_Saved(object sender, DataObjectEventArgs<Program.Customer> e)
    {
        Console.WriteLine("DataConcierge<Customer>.Saved");
    }

    static void DataConcierge_BatchSaved(object sender, BatchDataObjectEventArgs<Program.Customer> e)
    {
        Console.WriteLine("DataConcierge<Customer>.BatchSaved: {0}", e.DataObjects.Count());
    }

    static void Main(string[] args)
    {
        DataConcierge<Customer> dc = new DataConcierge<Customer>();
        dc.Saved += new DataObjectSaved<Customer>(DataConcierge_Saved);
        dc.BatchSaved += new BatchDataObjectSaved<Customer>(DataConcierge_BatchSaved);

        var token = dc.BeginSave();
        try
        {
            for (int i = 0; i < 100; i++)
            {
                var c = new Customer();
                // ...
                dc.Save(token, c);
            }
        }
        finally
        {
            dc.EndSave(token);
        }
    }

This resulted in the following output:

DataConcierge<Customer>.BatchSaved: 17

DataConcierge<Customer>.BatchSaved: 17

DataConcierge<Customer>.BatchSaved: 17

DataConcierge<Customer>.BatchSaved: 17

DataConcierge<Customer>.BatchSaved: 17

DataConcierge<Customer>.BatchSaved: 15

The threshold in my example is set to 17, so a batch of 100 items causes the BatchSaved event to fire 6 times.

Dr. Wily's Apprentice
Standard complaint: Don't check the delegate for null and then call it, as that presents a race condition. Instead, make a local copy, check it for null and then call through it.
Steven Sudit
Ah, yes. Thanks for pointing that out. I've updated per your suggestion.
Dr. Wily's Apprentice
A: 

Here's one idea that's just fallen out of my head. I don't know how workable it is and can't see an obvious way to make it more generic, but it might be a start. All it does is provide a buffer for button click events (substitute with your event as necessary).

class ButtonClickBuffer
{
    public event EventHandler BufferedClick;

    public ButtonClickBuffer(Button button, int queueSize)
    {
        this.queueSize= queueSize;
        button.Click += this.button_Click;
    }

    private int queueSize;
    private List<EventArgs> queuedEvents = new List<EventArgs>();

    private void button_Click(object sender, EventArgs e)
    {
        queuedEvents.Add(e);
        if (queuedEvents.Count >= queueSize)
        {
            if (this.BufferedClick!= null)
            {
                foreach (var args in this.queuedEvents)
                {
                    this.BufferedClick(sender, args);
                }
                queuedEvents.Clear();
            }
        }
    }
}

So your subscriber, instead of subscribing as:

this.button1.Click += this.button1_Click;

Would use a buffer, specifying how many events to wait for:

ButtonClickBuffer buffer = new ButtonClickBuffer(this.button1, 5);
buffer.BufferedClick += this.button1_Click;

It works in a simple test form I knocked up, but it's far from production-ready!

You said you didn't want to wait for an event to see if there is a queue waiting, which is exactly what this does. You could substitute the logic inside the buffer to spawn a new thread which monitors the queue and dispatches events as necessary. God knows what threading and locking issues might arise from that!

batwad