I want to be able to join() the Queue class but timeouting after some time if the call hasn't returned yet. What is the best way to do it? Is it possible to do it by subclassing queue\using metaclass?
A:
At first, you should ensure that all your working threads in the queue exit with task_done()
To implement a timeout functionality with Queue
, you can wrap the Queue's code in a Thread and add a timeout for this Thread using Thread.join([timeout])
untested example to outline what I suggest
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
def queuefunc():
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.setDaemon(True)
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
t = Thread(target=queuefunc)
t.start()
t.join(100) # timeout applies here
tuergeist
2009-10-14 07:30:41
+2
A:
Subclassing Queue
is probably the best way. Something like this should work (untested):
def join_with_timeout(self, timeout):
self.all_tasks_done.acquire()
try:
endtime = time() + timeout
while self.unfinished_tasks:
remaining = endtime - time()
if remaining <= 0.0:
raise NotFinished
self.all_tasks_done.wait(remaining)
finally:
self.all_tasks_done.release()
Lukáš Lalinský
2009-10-14 07:33:14
Thanks! Where did you get info about all_task_done? I looked in http://docs.python.org/library/queue.html#module-Queue but I don't see any mention of that memeber...
noam
2009-10-14 07:42:48
You can read the source code for Queue. It has a `timeout` parameter implemented for `put` and `get`, it was easy enough to extend `join` to use a similar approach.
Lukáš Lalinský
2009-10-14 07:45:09
Hmm, smart solution ;)
tuergeist
2009-10-14 12:59:37
A:
The join() method is all about waiting for all the tasks to be done. If you don't care whether the tasks have actually finished, you can periodically poll the unfinished task count:
stop = time() + timeout
while q.unfinished_tasks and time() < stop:
sleep(1)
This loop will exist either when the tasks are done or when the timeout period has elapsed.
Raymond
Raymond Hettinger
2010-08-18 20:20:30