I'm writing a threading.Thread
subclass that implements 'callable' methods.
Normally with an arbitrary method on a thread, the method runs in whatever thread calls it. These methods run in the thread that they are called on and either block further execution in the calling thread or return a means to get the result depending on which decorator is used to create the method. The code is:
import threading
import collections
class CallableThread(threading.Thread):
def __init__(self, *args, **kwargs):
#_enqueued_calls is used to store tuples that encode a functin call. It is processed by the run method
self._enqueued_calls = collections.deque()
# _enqueue_call_permission is for callers to signal that they have placed something on the queue
self._enqueue_call_permission = threading.Condition()
super(CallableThread, self).__init__(*args, **kwargs)
@staticmethod
def blocking_method(f):
u"""A decorator functino to implement a blocking method on a thread"""
# the returned function enqueues the decorated function and blocks until the decorated function
# is called and returns. It then returns the value unmodified. The code in register runs
# in the calling thread and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), response, call_complete))
self._enqueue_call_permission.notify()
with call_complete:
if not response:
call_complete.wait()
return response.popleft()
return register
@staticmethod
def nonblocking_method(f):
u"""A decorator function to implement a non-blocking method on a thread"""
# the returned function enqueues the decorated function and returns a tuple consisting of a condition
# to wait for and a deque to read the result out of. The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), None, None))
self._enqueue_call_permission.notify()
return call_complete, response
return register
def run(self):
self._run = True
while self._run: # while we've not been killed
with self._enqueue_call_permission: # get the condition so we can wait on it.
if not self._enqueued_calls:
self._enqueue_call_permission.wait() # wait if we need to
while self._enqueued_calls:
((f, self, args, kwargs), response_deque, call_complete) = self._enqueued_calls.popleft()
with call_complete:
response_deque.append(f(self, *args, **kwargs))
call_complete.notify()
def stop(self):
u""" Signal the thread to stop"""
self._run = False
if __name__=='__main__':
class TestThread(CallableThread):
u"""Increment a counter on each call and print the value"""
counter = 0
@CallableThread.blocking_method
def increment(self, tag):
print "{0} FROM: {1}".format(self.counter, tag)
self.counter += 1
class ThreadClient(threading.Thread):
def __init__(self, callable_thread, tag):
self.callable_thread = callable_thread
self.tag = tag
super(ThreadClient, self).__init__()
def run(self):
for i in range(0, 4):
self.callable_thread.increment(self.tag)
t = TestThread()
t.start()
clients = [ThreadClient(t, i) for i in range(0, 10)]
for client in clients:
client.start()
## client.join()
for client in clients:
client.join()
t.stop()
As you can no doubt see, I'm using static methods as decorators. The decorators take the method that they are applied to and return a function that enqueues the decorated function along with the arguments it is called with, a threading.Condition instance to notify
of completion and a collections.deque
instance to right the result too.
Any suggestions? I'm especially interested in naming, architectural points and robustness
EDIT: some changes that I made based on suggestions while I was away from an interpreter broke the code so I just fixed it.