views:

70

answers:

2

When using a single threadded loop I was easily able to limit my messages sent per second by putting the thread to sleep (ie Thread.Sleep(1000/MessagesPerSecond)), easy enough... but now that I have expanded into parallel threads this no longer works properly.

Does anyone have a suggestion how to throttle messages sent when using Parallel threads?

Parallel.For(0, NumberOfMessages, delegate(int i) {

   // Code here

   if (MessagesPerSecond != 0)
      Thread.Sleep(1000/MessagesPerSecond);
});
+4  A: 

Use an AutoResetEvent and a timer. Whenever the timer fires, have it Set the AutoResetEvent.

Then have your process that sends messages WaitOne on the AutoResetEvent immediately before sending.

    private static readonly AutoResetEvent _Next = new AutoResetEvent(true);
    private static Timer _NextTimer;

    private static void SendMessages(IEnumerable<Message> messages)
    {
        if (_NextTimer == null)
            InitializeTimer();

        Parallel.ForEach(
            messages,
            m =>
            {
                _Next.WaitOne();
                // Do something
            }
            );
    }

    private static void SetNext(object state)
    {
        _Next.Set();
    }
Toby
+2  A: 

You might consider using a shared ConcurrentQueue, which your parallel loop would populate with prepared messages. Use the System.Threading.Timer to pull messages from the queue at your desired interval and send them. Note that this design only make sense if creating the messages to be sent is expensive; if the actual sending of the messages is the expensive part, there is no reason to run the loop in parallel.

If you need to stop the timer after the messages have been sent, you'll have to do some additional work, but this design works well for a throttled message sender that has to handle asynchronous message queuing. Another boundary case to consider is 'message pile-up', where messages are queued up faster than they can be processed. You might want to consider generating an error in this case (as it may indicate a bug) or using a BlockingCollection.

Dan Bryant