views:

316

answers:

2

I'm using condition variables in threads that require a timeout. I didn't notice until I saw the CPU usage when having a lot of threads running, that the condition variable provided in the threading module doesn't actually sleep, but polls when a timeout is provided as an argument.

Is there an alternative to this that actually sleeps like pthreads?

Seems painful to have a lot of threads sleeping at multiple second intervals only to have it still eating CPU time.

Thanks!

+1  A: 

I'm not familiar with Python, but if you are able to block on a condition variable (without a timeout), you could implement the timeout yourself. Let the blocking thread store the time it began blocking and set a timer to signal it. When it wakes, check the time elapsed for a timeout. This isn't a very good way to do it unless you can aggregate the timers to a single thread, otherwise, your thread count would double without reason.

Ioan
+1  A: 

This seems tricky to do in Python, but here is a one solution. It relies on spawning additional threads but doesn't use polling AND ensures that the original thread is woken up as soon as the timeout expires or as soon as the original wait() returns.

Note: The following code includes a test case which tests both the conditional wait ending due to a timeout as well as due to a notification.

from thread import start_new_thread
from threading import Condition, Timer

class ConditionWithoutPolling():
    """Implements wait() with a timeout without polling.  Wraps the Condition
    class."""
    def __init__(self, condition):
        self.condition = condition
        self.wait_timeout_condition = Condition()

    def wait(self, timeout=None):
        """Same as Condition.wait() but it does not use a poll-and-sleep method
        to implement timeouts.  Instead, if a timeout is requested two new
        threads are spawned to implement a non-pol-and-wait method."""
        if timeout is None:
            # just use the original implementation if no waiting is involved
            self.condition.wait()
            return
        else:
            # this new boolean will tell us whether we are done waiting or not
            done = [False]

            # wait on the original condition in a new thread
            start_new_thread(self.wait_on_original, (done,))

            # wait for a timeout (without polling) in a new thread
            Timer(timeout, lambda : self.wait_timed_out(done)).start()

            # wait for EITHER of the previous threads to stop waiting
            with self.wait_timeout_condition:
                while not done[0]:
                    self.wait_timeout_condition.wait()

    def wait_on_original(self, done):
        """Waits on the original Condition and signals wait_is_over when done."""
        self.condition.wait()
        self.wait_is_over(done)

    def wait_timed_out(self, done):
        """Called when the timeout time is reached."""
        # we must re-acquire the lock we were waiting on before we can return
        self.condition.acquire()
        self.wait_is_over(done)

    def wait_is_over(self, done):
        """Modifies done to indicate that the wait is over."""
        done[0] = True
        with self.wait_timeout_condition:
            self.wait_timeout_condition.notify()

    # wrap Condition methods since it wouldn't let us subclass it ...
    def acquire(self, *args):
        self.condition.acquire(*args)
    def release(self):
        self.condition.release()
    def notify(self):
        self.condition.notify()
    def notify_all(self):
        self.condition.notify_all()
    def notifyAll(self):
        self.condition.notifyAll()

def test(wait_timeout, wait_sec_before_notification):
    import time
    from threading import Lock
    lock = Lock()
    cwp = ConditionWithoutPolling(Condition(lock))
    start = time.time()

    def t1():
        with lock:
            print 't1 has the lock, will wait up to %f sec' % (wait_timeout,)
            cwp.wait(wait_timeout)
        time_elapsed = time.time() - start
        print 't1: alive after %f sec' % (time_elapsed,)        

    # this thread will acquire the lock and then conditionally wait for up to 
    # timeout seconds and then print a message 
    start_new_thread(t1, ())

    # wait until it is time to send the notification and then send it
    print 'main thread sleeping (will notify in %f sec)' % (wait_sec_before_notification,)
    time.sleep(wait_sec_before_notification)
    with lock:
        cwp.notifyAll()
        print 'notification sent, will continue in 2sec'
    time.sleep(2.0) # give the other time thread to finish before exiting

if __name__ == "__main__":
    print 'test wait() ending before the timeout ...'
    test(2.0, 1.0)

    print '\ntest wait() ending due to the timeout ...'
    test(2.0, 4.0)
David Underhill
It'll be interesting to know how much this helps (if at all) performance-wise (trading off polling for some extra thread creation). I guess if you're polling a lot then maybe it will be a worthwhile trade-off.With some more work, perhaps the timers could all be serviced by a single thread (if you're calling wait with a timeout frequently maybe this would help).
David Underhill
Interesting solution to the problem, seems very expensive create 2 threads for every condition. Will have a play around with it though
Biohazard