views:

150

answers:

1

I'm writing a threading.Thread subclass that implements 'callable' methods.

Normally with an arbitrary method on a thread, the method runs in whatever thread calls it. These methods run in the thread that they are called on and either block further execution in the calling thread or return a means to get the result depending on which decorator is used to create the method. The code is:

import threading 
import collections 


class CallableThread(threading.Thread): 

    def __init__(self, *args, **kwargs): 
        #_enqueued_calls is used to store tuples that encode a functin call. It is processed by the run method 
        self._enqueued_calls = collections.deque() 
        # _enqueue_call_permission is for callers to signal that they have placed something on the queue 
        self._enqueue_call_permission = threading.Condition()                 
        super(CallableThread, self).__init__(*args, **kwargs) 

    @staticmethod 
    def blocking_method(f): 
        u"""A decorator functino to implement a blocking method on a thread""" 
        # the returned function enqueues the decorated function and blocks until the decorated function 
        # is called and returns. It then returns the value unmodified. The code in register runs 
        # in the calling thread and the decorated method runs in thread that it is called on 
        def register(self, *args, **kwargs): 
            call_complete = threading.Condition() 
            response = collections.deque() 
            with self._enqueue_call_permission: 
                self._enqueued_calls.append(((f, self, args, kwargs), response, call_complete)) 
                self._enqueue_call_permission.notify() 
            with call_complete: 
                if not response: 
                    call_complete.wait() 
            return response.popleft() 
        return register 

    @staticmethod 
    def nonblocking_method(f): 
        u"""A decorator function to implement a non-blocking method on a thread""" 
        # the returned function enqueues the decorated function and returns a tuple consisting of a condition 
        # to wait for and a deque to read the result out of. The code in register runs in the calling thread 
        # and the decorated method runs in thread that it is called on 
        def register(self, *args, **kwargs): 
            call_complete = threading.Condition() 
            response = collections.deque() 
            with self._enqueue_call_permission: 
                self._enqueued_calls.append(((f, self, args, kwargs), None, None)) 
                self._enqueue_call_permission.notify() 
            return call_complete, response 
        return register     

    def run(self):         
        self._run = True 
        while self._run:  # while we've not been killed 
            with self._enqueue_call_permission:  # get the condition so we can wait on it. 
                if not self._enqueued_calls: 
                    self._enqueue_call_permission.wait()  # wait if we need to 
            while self._enqueued_calls: 
                ((f, self, args, kwargs), response_deque, call_complete) = self._enqueued_calls.popleft() 
                with call_complete:    
                    response_deque.append(f(self, *args, **kwargs)) 
                    call_complete.notify() 


    def stop(self): 
        u""" Signal the thread to stop""" 
        self._run = False                     



if __name__=='__main__': 
    class TestThread(CallableThread): 
        u"""Increment a counter on each call and print the value""" 
        counter = 0 
        @CallableThread.blocking_method 
        def increment(self, tag): 
            print "{0} FROM: {1}".format(self.counter, tag) 
            self.counter += 1 

    class ThreadClient(threading.Thread): 
        def __init__(self, callable_thread, tag): 
            self.callable_thread = callable_thread 
            self.tag = tag 
            super(ThreadClient, self).__init__() 

        def run(self): 
            for i in range(0, 4): 
                self.callable_thread.increment(self.tag) 

    t = TestThread() 
    t.start() 
    clients = [ThreadClient(t, i) for i in range(0, 10)] 
    for client in clients: 
        client.start() 
##        client.join() 
    for client in clients: 
        client.join() 
    t.stop()

As you can no doubt see, I'm using static methods as decorators. The decorators take the method that they are applied to and return a function that enqueues the decorated function along with the arguments it is called with, a threading.Condition instance to notify of completion and a collections.deque instance to right the result too.

Any suggestions? I'm especially interested in naming, architectural points and robustness

EDIT: some changes that I made based on suggestions while I was away from an interpreter broke the code so I just fixed it.

+2  A: 

You can use

def register(self, *args, **kwargs)

in stead of unpacking self from args. Also, why use capitals for _RUN?

Ivo van der Wijk
Good point on not unpacking self. Long story on _RUN being in caps. It started off as a global in another module that applied to all threads and I always capitalize such variables. I'll go ahead and change it because it does seem unnatural now that you mention it
aaronasterling