views:

91

answers:

1

I'm working on a subclass of threading.Thread which allows its methods to be called and run in the thread represented by the object that they are called on as opposed to the usual behavior. I do this by using decorators on the target method that place the call to the method in a collections.deque and using the run method to process the deque.

the run method uses a while not self.__stop: statement and a threading.Condition object to wait for a call to be placed in the deque and then call self.__process_calls. The else part of the while loop makes a final call to __process_calls. if self.__stop, an exception is raised on any attempts to call one of the 'callable' methods from another thread.

The problem is that __process_calls fails to return unless the last statement is a print which I discovered during debugging. I've tried a = 1 and an explicit return but neither work. with any print statement as the final statement of the function though, it returns and the thread doesn't hang. Any ideas what's going on?

EDIT: It was pointed out by David Zaslavsky that the print works because it takes a while and I've confirmed that

The code's a little long but hopefully, my explanation above is clear enough to help understand it.

import threading
import collections    

class BrokenPromise(Exception): pass    
class CallableThreadError(Exception): pass    
class CallToNonRunningThreadError(CallableThreadError): pass   


class Promise(object):
    def __init__(self, deque, condition):
        self._condition = condition
        self._deque = deque

    def read(self, timeout=None):
        if not self._deque:
            with self._condition:
                if timeout:
                    self._condition.wait(timeout)
               else:
                    self._condition.wait()
        if self._deque:
            value = self._deque.popleft()
            del self._deque
            del self._condition
            return value
        else:
           raise BrokenPromise

    def ready(self):
        return bool(self._deque) 

class CallableThread(threading.Thread):
    def __init__(self, *args, **kwargs): 
        # _enqueued_calls is used to store tuples that encode a function call.
        # It is processed by the run method 
        self.__enqueued_calls = collections.deque() 
        # _enqueue_call_permission is for callers to signal that they have
        # placed something on the queue 
        self.__enqueue_call_permission = threading.Condition()
        self.__stop = False
        super(CallableThread, self).__init__(*args, **kwargs) 

    @staticmethod
    def blocking_method(f): 
        u"""A decorator function to implement a blocking method on a thread""" 
        # the returned function enqueues the decorated function and blocks
        # until the decorated function# is called and returns. It then returns
        # the value unmodified. The code in register runs in the calling thread
        # and the decorated method runs in thread that it is called on 
        f = CallableThread.nonblocking_method_with_promise(f)
        def register(self, *args, **kwargs):
            p = f(self, *args, **kwargs)
            return p.read()
        return register

    @staticmethod 
    def nonblocking_method_with_promise(f):
        u"""A decorator function to implement a non-blocking method on a
        thread
        """ 
        # the returned function enqueues the decorated function and returns a
        # Promise object.N The code in register runs in the calling thread 
        # and the decorated method runs in thread that it is called on. 
        def register(self, *args, **kwargs): 
            call_complete = threading.Condition() 
            response_deque = collections.deque()
            self.__push_call(f, args, kwargs, response_deque, call_complete)
            return Promise(response_deque, call_complete)
        return register

    @staticmethod
    def nonblocking_method(f):
        def register(self, *args, **kwargs):
            self.__push_call(f, args, kwargs)
        return register

    def run(self):        
        while not self.__stop:  # while we've not been killed 
            with self.__enqueue_call_permission:
                # get the condition so that we can wait on it if we need too. 
                if not self.__enqueued_calls: 
                    self.__enqueue_call_permission.wait() 
            self.__process_calls()
        else:
            # if we exit because self._run == False, finish processing
            # the pending calls if there are any
            self.__process_calls()

    def stop(self): 
        u""" Signal the thread to stop"""
        with self.__enqueue_call_permission:
           # we do this in case the run method is stuck waiting on an update
           self.__stop = True
           self.__enqueue_call_permission.notify()

    def __process_calls(self):
        print "processing calls"
        while self.__enqueued_calls:
            ((f,  args, kwargs),
            response_deque, call_complete) = self.__enqueued_calls.popleft()
            if call_complete:
                with call_complete:
                    response_deque.append(f(self, *args, **kwargs)) 
                    call_complete.notify()
            else:
                f(self, *args, **kwargs)
        # this is where you place the print statement if you want to see the
        # behavior        

    def __push_call(self, f, args, kwargs, response_deque=None,
                    call_complete=None):
        if self.__stop:
            raise CallToNonRunningThreadError(
                  "This thread is no longer accepting calls")
        with self.__enqueue_call_permission:
            self.__enqueued_calls.append(((f, args, kwargs),
                                           response_deque, call_complete))
            self.__enqueue_call_permission.notify()


#if __name__=='__main__':      i lost the indent on the following code in copying but
#it doesn't matter in this context
class TestThread(CallableThread): 
    u"""Increment a counter on each call and print the value""" 
    counter = 0

    @CallableThread.nonblocking_method_with_promise
    def increment(self): 
        self.counter += 1
        return self.counter

class LogThread(CallableThread):

    @CallableThread.nonblocking_method
    def log(self, message):
        print message

l = LogThread()
l.start()
l.log("logger started")
t = TestThread() 
t.start()
l.log("test thread started")
p = t.increment()
l.log("promise aquired")
v = p.read()
l.log("promise read")
l.log("{0} read from promise".format(v))
l.stop()
t.stop()
l.join()
t.join()
+1  A: 
  1. __process_calls is modifying __enqueued_calls without owning the lock. This may be creating a race condition.

  2. Edit: deque may be "threadsafe" (ie not corrupted by thread accesses), but the checking of its state still should be locked.

  3. The stop condition is also not safe.

Comments inline:

def run(self):        
    while not self.__stop:  # while we've not been killed 
        with self.__enqueue_call_permission:
            # get the condition so that we can wait on it if we need too. 
            ### should be checking __stop here, it could have been modified before
            ### you took the lock.
            if not self.__enqueued_calls: 
                self.__enqueue_call_permission.wait() 
        self.__process_calls()
    else:
        # if we exit because self._run == False, finish processing
        # the pending calls if there are any
        self.__process_calls()
bstpierre
I tried it and nope. the manual says that the `popleft` method on a `deque` is threadsafe. `__enqueue_call_permission` is really just a method for `__push_call` to signal to `run` that there is a new call on the `deque`. I should probably rename it.
aaronasterling
It was 3. I'm still not sure how it works. My mental runthrough says that that mistake would result in `run` hanging on the call to `__enqueue_call_permission.wait()` but that's not at all the behavior that manifested. At any rate, it's fixed now.
aaronasterling
I had the same thought. It looked wrong to me, but I couldn't figure out a failure scenario that matched your symptoms, but glad it's fixed.
bstpierre