views:

103

answers:

3

Hi all!

For some part of my project I need a process-local scheduling system that will allow me to delay method execution on few seconds. I have thousands of “clients” of this system, so using threading.Timer for each delay is a bad idea because I will quickly reach OS thread limit. I've implemented a system that use only one thread for timing control.

The main idea is to keep sorted task (time + func + args + kwargs) queue and to use single threading.Timer to schedule/cancel executions of the head of this queue. This scheme works, but I'm not happy with performance. ~2000 clients that schedule dummy tasks every ~10 seconds cause the process to take 40% of CPU time. Looking at profiler output I see that all time is spent on new threading.Timers construction, its start and particularly on new threads creation.

I believe there is a better way. Now I think about rewriting the LightTimer so that there will be one execution thread controllable by threading.Event and several timing threads that will set() the event. For example:

  • I schedule a task to call in 10 secs. The task is added to a queue. Timing thread #1 starts time.sleep(10) before event.set()
  • Then I schedule a task to call in 11 secs. The task is added to the queue. Nothing happens with timing thread, it will notice new task after wake up.
  • Then I schedule a task to call in 5 secs. The task is prepended to the queue. Timing thread #2 starts time.sleep(5) because #1 sleeps already for a longer interval.

I hope you've caught the idea. What do you think about this way? Is there a better way? Maybe I can utilize some linux system features to make optimal solution?

+2  A: 

An alternative implementation you could use is to use the time.time() method to calculate the absolute time each queued function should be executed. Place this time and your function-to-be-called in an object wrapper that overrides the comparison operator using the execution time to determine order. Then use the heapq module to maintain a min-heap. This will provide you with an efficient datastructure where element 0 of the heap is always your next event.

One way to implement the actual calls would be to use a separate thread to execute the callbacks. The heap will need to be protected with a mutex and you can use a condition variable to implement the scheduling. In an infinite loop, just lookup the next time to execute a function (element 0 of the heap) and use the condition variable's wait() method with the timeout set to the next execution time. Your heap insertion method could then use the condition variable's notify() method to wake the scheduling thread early if the newly inserted function should occur prior to the earliest one already in the heap.

Rakis
Hm… very interesting, thanks! Especially I've never looked at `heapq` and didn't thought about using `threading.Condition` for this purpose.
nailxx
+2  A: 

Have you looked at the sched module in the Python standard library? Running the scheduler on a dedicated thread (and having all the scheduled actions be "put a bound method and its args on a queue" from which threads in a pool peel and execute it -- much as I wrote in the Nutshell chapter on threads, except that in that case there was no scheduling) should do what you want.

Alex Martelli
And how to deal with a statement from the docs *“the scheduler class has limitations with respect to thread-safety, inability to insert a new task before the one currently pending in a running scheduler”*? Does it mean I *can* insert a new task before one currently pending but from the same thread or do I have to spawn multiple threads running sched?
nailxx
@nailxx, it does require some care -- that warning relates to "naive" uses, such as, using the "normal" delayfunc (just time.sleep) or executing a scheduler's method from multiple threads. With a delayfunc that does a Queue.get with timeout on a dedicated queue (so wakes up as soon as anything is pushed on the queue... and pulls it and performs scheduling method calls if needed) -- and pushing from other threads to that dedicated Queue in lieu of calling scheduler methods from them -- I have successfully used a single scheduler instance in a similar multi-threaded scenario.
Alex Martelli
A: 

You are unlikely to reach the OS thread limit with "a few thousand clients"; you may consume a lot of unnecessary memory with the stacks for all those threads though.

Have a look at what twisted does, it allows a process to multiplex a lot of events (including timers) in a way which has proven to work quite well with large numbers of events.

You can also combine event-driven and multi-process models, by running several processes per machine and doing event-driven logic in each one - say one process can handle 2,000 clients, you can still run 30x processes (provided there is sufficient overall resource) and gain better throughput, especially on modern multi-core hardware.

MarkR
*You are unlikely to reach the OS thread limit* — I actually did it :)
nailxx
I expect you exhausted address space with all those stacks. Linux default stack size is typically 1M (or similar), so it only takes a couple of thousand threads to use up A/S in a 32-bit process. The OS has a much higher limit.
MarkR