views:

75

answers:

3

I'm planning to design a class to limit the execution of a function to given amount within the specified time, for example:

  • Process max. 5 files in 1 second

It should be thread-safe and performance hit should be minimal.

How would you design such a class? I've got couple of ideas but none of them seemed right to me.

Is there any known design pattern for such a task? (I'm coding .NET but any language is OK)

From the outside class should work like this (assuming it's singleton):

Setup:

Limiter.Instance.MaxExecutionPerSecond = 5

Then we would call this within threads before executing our function and if it's required it'd block the thread:

Limiter.Instance.WaitIfRequired()
+2  A: 

Something like this?

using Timer = System.Threading.Timer;

class Limiter{
    public static readonly Limiter Instance = new Limiter();

    Limiter(){}

    int         max;
    Semaphore   counter;
    List<Timer> timers  = new List<Timer>();

    // warning: not thread safe!
    public int MaxExecutionPerSecond{
        get{return max;}
        set{counter = new Semaphore(max = value, value);}
    }

    public void WaitIfRequired(){
        // Our semaphore starts with a count of MaxExecutionPerSecond.
        // When we call WaitOne(), it decrements the count.  If the count
        // is already zero, the call to WaitOne() will block until another
        // thread calls Release() to increment the count.
        counter.WaitOne();

        // Set a timer to increment the semaphore in one second.
        Timer t = null;
        t = new Timer(o=>{
            // Increment the semaphore.
            counter.Release();

            // We no longer need to protect this timer from the GC.
            timers.Remove(t);
            t.Dispose();
        });

        // Hold a reference to this timer to keep it from being disposed of.
        timers.Add(t);

        // Set the timer to release the semaphore in one second.
        t.Change(1000, Timeout.Infinite);
    }
}

EDIT

One thing to keep in mind is that the above code will only prevent many threads starting at once. If the threads are long running, then it's still possible to have many threads running at once. For instance, if you start 5 threads per second, but each thread runs for 1 second, then at any given time after 2 seconds, you'll theoretically have 10 threads running.

If you want to make sure you never have more than 5 threads running at once, the simplest thing to do is to dispense with the custom Limiter class, and just use a Semaphore directly.

const int maxThreadCount = 5;
static Semaphore counter = new Semaphore(maxThreadCount, maxThreadCount);

static void NewThread(object state){
    counter.WaitOne();

    // do something

    counter.Release();
}

Now that's just about as simple as it can be. One caveat though: creating new threads and immediately sleeping them is generally considered a bad idea. This uses system resources to create a thread that's not doing anything. It's better to queue up requests and start up new threads (or better yet, use thread pool threads) to handle them only when they are eligible to run. This is more complicated and harder to get right. Depending on design, it might require an additional thread for a scheduler/manage. Something like this:

class Limiter{
    class WorkData{
        readonly ParameterizedThreadStart action;
        readonly object                   data;

        public ParameterizedThreadStart Action{get{return action;}}
        public object                   Data  {get{return data;}}

        public WorkData(ParameterizedThreadStart action, object data){
            this.action = action;
            this.data   = data;
        }
    }

    readonly Semaphore       threadCount;
    readonly Queue<WorkData> workQueue   = new Queue<WorkData>();
    readonly Semaphore       queueCount  = new Semaphore(0, int.MaxValue);

    public Limiter(int maxThreadCount){
        threadCount = new Semaphore(maxThreadCount, maxThreadCount);
        Thread t = new Thread(StartWorkItems);
        t.IsBackground = true;
        t.Start();
    }

    void StartWorkItems(object ignored){
        while(queueCount.WaitOne() && threadCount.WaitOne()){
            WorkData wd;
            lock(workQueue)
                wd = workQueue.Dequeue();

            ThreadPool.QueueUserWorkItem(DoWork, wd);
        }
    }
    void DoWork(object state){
        WorkData wd = (WorkData)state;
        wd.Action(wd.Data);
        counter.Release();
    }

    public void QueueWork(ParameterizedThreadStart action, object data){
        lock(workQueue)
            workQueue.Enqueue(new WorkData(action, data));
        queueCount.Release();
    }
}

In this class, I've removed the singleton property and given the constructor a maxThreadCount parameter. This avoids the lack of the thread safety in the property of the first class. There are other ways thread safety can be added, but this was the simplest.

P Daddy
Really smart implementation, thanks. Any idea about cost of creating let's say 100 timers per second?
dr. evil
I would expect the cost to be negligible. Neither the semaphore nor the timers are particularly expensive. Direct profiling of this code is somewhat difficult, of course, but as a simple test, I set `MaxExecutionPerSecond` to 100 and called `WaitIfRequired()` 10,000 times. Total execution time was approximately 99 seconds, as expected, and Task Manager showed 0 seconds of CPU time for the process.
P Daddy
I really like the way that there is not Thread.Sleep()
dr. evil
A: 

I assume that getting the current time is inexpensive compared to whatever it is that the function would be doing.

In that case, I've done something like this in Scala, and the design pattern would go something like:

  • WaitIfRequired should block if already executing. (It would be a "synchronized method" in Java; I'm not sure what the .NET equivalent is.)
  • Maintain a queue of times that WaitIfRequired was called.
  • If the length of the queue goes over MaxExecutionPerSecond, cut it back until it is no more than MaxExecutionPerSecond long.
  • Pop the top off the queue, if it's MaxExecutionPerSecond long. If the time now is more than a second since then, push the current time on the tail of the queue and return immediately; enough time has elapsed. If it is less for a second, sleep for the amount of time needed for it to be a second (i.e. 1 second - time elapsed) before pushing and returning.

That's it.

You can play with this by asking for no more than N calls in time T by replacing "one second" by T.

Now, if you don't have synchronized methods, things get a little more interesting (but only a little). You then need an external locking mechanism to make sure only one thread at a time is reading times and waiting.

Rex Kerr
+1  A: 

For the kind of resolution you are looking, DateTime.Now is an excellent clock and very cheap. It updates with a accuracy of a bit over 15 msec. Here's an example, call the Operation() method just before you perform the operation:

using System;

class Throttle {
  private int mTrigger;
  private int mOperations;
  private DateTime mStart;

  public Throttle(int maxOperationsPerSecond) {
    if (maxOperationsPerSecond < 1) throw new ArgumentException();
    mTrigger = maxOperationsPerSecond;
  }
  public void Operation() {
    mOperations += 1;
    if (mOperations > mTrigger) {
      TimeSpan span = DateTime.UtcNow - mStart;
      if (span.TotalMilliseconds < 1000)
        System.Threading.Thread.Sleep(1000 - (int)span.TotalMilliseconds);
      mOperations = 1;
    }
    if (mOperations == 1) mStart = DateTime.UtcNow;
  }
}

Create the instance of the class in your thread, don't share it.

Hans Passant
Very good one, it's not thread-safe but that's easy to add.
dr. evil
BTW Why did you say don't share it? If I don't share it there is no purpose as one thread only does one operation. The idea is throttling multiple threads operations from one point.
dr. evil
I assumed you'd want to throttle one thread at a time. If you do want to share it, you'll just need to add the lock statement at the start of the Operation() method.
Hans Passant