views:

167

answers:

2

I want a long-running process to return its progress over a Queue (or something similar) which I will feed to a progress bar dialog. I also need the result when the process is completed. A test example here fails with a RuntimeError: Queue objects should only be shared between processes through inheritance.

import multiprocessing, time

def task(args):
    count = args[0]
    queue - args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

I've been able to get this to work using individual Process objects (where I AM alowed to pass a Queue reference) but then I don't have a pool to manage the many processes I want to launch. Any advise on a better pattern for this?

A: 

Making q global works...:

import multiprocessing, time

q = multiprocessing.Queue()

def task(count):
    for i in xrange(count):
        q.put("%d mississippi" % i)
    return "Done"

def main():
    pool = multiprocessing.Pool()
    result = pool.map_async(task, range(10))
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

If you need multiple queues, e.g. to avoid mixing up the progress of the various pool processes, a global list of queues should work (of course, each process will then need to know what index in the list to use, but that's OK to pass as an argument;-).

Alex Martelli
Will this work if the "task" is defined in a different module or package? The example code is very simplified. The real program has an MVC architecture where a producer-consumer pipeline is set up across multiple cores (the model) and it needs to send progress updates to the wxPython GUI (the View).
David
@David, you can try; if your real code doesn't work in this simple way, you'll need to move up a notch in complexity and go for a Manager (which can give you proxies to Queues, etc).
Alex Martelli
This doesn't seem to work at all. q never returns anything q.empty() is always True on my machine. Even if I increase the sleep call to 10 seconds which should be excessive time for the task to put a few messages on the queue, q.empty always returns True.
David
@David, by "This", do you mean the code I posted in my A? Because that code works fine for me on a dual-core macbook with OSX 10.5, Python 2.6.5 or 2.7. What's your platform?
Alex Martelli
Yes, I copied the code you posted and while the results are returned (I get 10 "Done" in a list) nothing is ever returned by the Queue. The debugger shows that q always returns q.empty() == True.Windows 7, ActivePython 2.6.5.12
David
@David, it does work fine on the Mac, as I said (no Windows to check there). Ah well, then it looks like a Manager is the only viable approach for you.
Alex Martelli
@Alex Thanks. I'll look into Managers.
David
+1  A: 

The following code seems to work:

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Note that the Queue is got from a manager.Queue() rather than multiprocessing.Queue(). Thanks Alex for pointing me in this direction.

David