views:

796

answers:

4

What's the best way to wait (without spinning) until something is available in either one of two (multiprocessing) Queues, where both reside on the same system?

+2  A: 

You could use something like the Observer pattern, wherein Queue subscribers are notified of state changes.

In this case, you could have your worker thread designated as a listener on each queue, and whenever it receives a ready signal, it can work on the new item, otherwise sleep.

uniquesnowflake8
Well, the `get` is destructive, so you can't really do observation on the queue itself as GoF describe it. The dequeue-ing thread would have to be the "observed" -- I was hoping for less overhead than two additional threads.
cdleary
Also, if I wanted a single point of access for the calling process (like in `select`) I would need a thread-safe queue on top of those two threads.
cdleary
+5  A: 

It doesn't look like there's an official way to handle this yet. Or at least, not based on this:

You could try something like what this post is doing -- accessing the underlying pipe filehandles:

and then use select.

ars
+1 Wow, nice finds! My Google-fu appears to be weak...
cdleary
+2  A: 

Seems like using threads which forward incoming items to a single Queue which you then wait on is a practical choice when using multiprocessing in a platform independent manner.

Avoiding the threads requires either handling low-level pipes/FDs which is both platform specific and not easy to handle consistently with the higher-level API.

Or you would need Queues with the ability to set callbacks which i think are the proper higher level interface to go for. I.e. you would write something like:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Maybe the multiprocessing package could grow this API but it's not there yet. The concept works well with py.execnet which uses the term "channel" instead of "queues", see here http://tinyurl.com/nmtr4w

hpk42
That would be a very nice interface! (Though clearly there's benefit to keeping the stdlib interfaces tight, as Jesse mentions in the @ars' referenced bug report.)
cdleary
true but the current Queue public API doesn't handle your use case which i think is a common one.
hpk42
If it's "common" - file a bug report + patch (with tests for the love of pete) on bugs.python.org and I can evaluate it for 2.7/3.x
jnoller
A: 

Actually you can use multiprocessing.Queue objects in select.select. i.e.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

would select que only if it is ready to be read from.

No documentation about it though. I was reading the source code of the multiprocessing.queue library (at linux it's usually sth like /usr/lib/python2.6/multiprocessing/queue.py) to find it out.

With Queue.Queue I didn't have found any smart way to do this (and I would really love to).

silverado