views:

2393

answers:

4

I am experimenting with the new multiprocessing module in Python 2.6. I am creating several processes each with its own multiprocessor.JoinableQueue instance. Each process spawns one or more worker threads (subclasses of threading.Thread) which share the JoinableQueue instance (passed in through each Thread's __init__ method). It seems to generally work but occasionally and unpredictably fails with the following error:

  File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run
    self.queue.task_done()
  File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

My Queue get() and task_done() calls are right after each other so they should be equal. Anecdotally this seems to occur only when the work done between the get() and the task_done() is VERY quick. Inserting a small time.sleep(0.01) seems to alleviate the problem.

Any ideas what is going on? Can I use a multiprocessor Queue with threads instead of the more traditional (Queue.Queue)?

Thanks!

-brian

+2  A: 

You should pass Queue objects as target's arguments.

Example from multiprocessing's documentation:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

 if __name__ == '__main__':
     q = Queue()
     p = Process(target=f, args=(q,))
     p.start()
     print q.get()    # prints "[42, None, 'hello']"
     p.join()

Queues are thread and process safe.

J.F. Sebastian
A: 

Thanks for the quick response. I am passing the multiprocessing.Queue instances as arguments to each Process as you illustrate. The failure seems to occur in the threads. I am creating them by subclassing threading.Thread and passing the queue to the 'init' method of each thread instance. This seems to be the accepted way to pass in Queues to thread subclasses. My only thought it that multiprocessing Queues may not be compatible with threads (although they are supposedly thread-safe).

+3  A: 

I didn't experiment with multi-processing in 2.6 yet, but I played a lot with pyprocessing (as it was called in 2.5).

I can see that you are looking for a number of processes with each spawning a set of threads respectively.

Since you are using the multiprocessing module, I will suggest use multi process and not multi thread approach, you will hit less problems like deadlocks, etc.

Create a queue object. http://pyprocessing.berlios.de/doc/queue-objects.html

For creating a multi process environment use a pool: http://pyprocessing.berlios.de/doc/pool-objects.html which will manage the worker processes for you. You can then apply asynchronous/synchronous to the workers and can also add a callback for each worker if required. But remember call back is a common code block and it should return immediately (as mentioned in documentation)

Some additional info: If required create a manager http://pyprocessing.berlios.de/doc/manager-objects.html to manage the the access to the queue object. You will have to make the queue object shared for this. But the advantage is that, once shared and managed you can access this shared queue all over the network by creating proxy objects. This will enable you to call methods of a centralized shared queue object as (apparently) native methods on any network node.

here is a code example from the documentation

It is possible to run a manager server on one machine and have clients use it from other machines (assuming that the firewalls involved allow it). Running the following commands creates a server for a shared queue which remote clients can use:

>>> from processing.managers import BaseManager, CreatorMethod
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy')
...
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()

One client can access the server as follows:

>>> from processing.managers import BaseManager, CreatorMethod
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(typeid='get_proxy')
...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()
>>> queue.put('hello')

If you insist on safe threaded stuff, PEP371 (multiprocessing) references this http://code.google.com/p/python-safethread/

JV
+1  A: 

You may be running into this bug:

http://bugs.python.org/issue4660

Forest Bond