views:

118

answers:

5

I'm learning to use the Queue module, and am a bit confused about how a queue consumer thread can be made to know that the queue is complete. Ideally I'd like to use get() from within the consumer thread and have it throw an exception if the queue has been marked "done". Is there a better way to communicate this than by appending a sentinel value to mark the last item in the queue?

+3  A: 

Queue's don't inherently have the idea of being complete or done. They can be used indefinitely. To close it up when you are done, you will indeed need to put None or some other magic value at the end and write the logic to check for it, as you described. The ideal way would probably be subclassing the Queue object.

See http://en.wikipedia.org/wiki/Queue_(data_structure) to learn more about queue in general.

Matt Williamson
Otherwise cheekily known as the poison pill. :P
Jeremy Brown
I'm wrapping it in functions that connect it with iterators on either end. Using an empty list or some other simple object seems best, since it will have a unique identity when checked for with `is`. This seems like it would work well unless `multiprocessing` is being used, in which case I guess I'd need to generate a UUID or something.
intuited
No UUID. Just create a singleton object: `QueueFinished = object()`. Queue that object, and test it with `item is QueueFinished`.
Glenn Maynard
+1  A: 

A sentinel is a natural way to shut down a queue, but there are a couple things to watch out for.

First, remember that you may have more than one consumer, so you need to send a sentinel once for each running consumer, and guarantee that each consumer will only consume one sentinel, to ensure that each consumer receives its shutdown sentinel.

Second, remember that Queue defines an interface, and that when possible, code should behave regardless of the underlying Queue. You might have a PriorityQueue, or you might have some other class that exposes the same interface and returns values in some other order.

Unfortunately, it's hard to deal with both of these. To deal with the general case of different queues, a consumer that's shutting down must continue to consume values after receiving its shutdown sentinel until the queue is empty. That means that it may consume another thread's sentinel. This is a weakness of the Queue interface: it should have a Queue.shutdown call to cause an exception to be thrown by all consumers, but that's missing.

So, in practice:

  • if you're sure you're only ever using a regular Queue, simply send one sentinel per thread.
  • if you may be using a PriorityQueue, ensure that the sentinel has the lowest priority.
Glenn Maynard
A: 

The best practice way of doing this would be to have the queue itself notify a client that it has reached the 'done' state. The client can then take any action that is appropriate.

What you have suggested; checking the queue to see if it is done periodically, would be highly undesirable. Polling is an antipattern in multithreaded programming, you should always be using notifications.

EDIT:
So your saying that the queue itself knows that it's 'done' based on some criteria and needs to notify the clients of that fact. I think you are correct and the best way to do this is by throwing when a client calls get() and the queue is in the done state. If your throwing this would negate the need for a sentinel value on the client side. Internally the queue can detect that it is 'done' in any way it pleases e.g. queue is empty, it's state was set to done etc I don't see any need for a sentinel value.

radman
I think you misunderstood my suggestion: I'm considering just `put`ting some unique value at the end of the queue to indicate to the receiving threads that it's the end. My question is basically "What is the best way to have a producer notify its consumers that it's done?"
intuited
updated, I think your question is still a little unclear however.
radman
Err. He's talking about the standard library Queue.Queue object. It doesn't *have* a "done" state, and in many cases it's nontrivial to add it externally. It definitely *should* have a `close` method, which would cause all blocking and future `get` calls to raise an exception; but it doesn't, so that doesn't help him.
Glenn Maynard
A: 

Queue is a FIFO (first in first out) register so remember that the consumer can be faster than producer. When consumers thread detect that the queue is empty normally realise one of following actions:

  1. Send to API: switch to next thread.
  2. Send to API: sleep some ms and than check again the queue.
  3. Send to API: wait on event (like new message in queue).

If you wont that consumers thread terminate after job is complete than put in queue a sentinel value to terminate task.

GJ
A: 

original (most of this has changed; see updates below)

Based on some of the suggestions (thanks!) of Glenn Maynard and others, I decided to roll up a descendant of Queue.Queue that implements a close method. It's available in the form of a primitive (unpackaged) module. I'll clean this up a bit and package it properly when I have a bit more time. For now the module only contains the CloseableQueue class and the Closed exception class. I'm planning to expand it to also include subclasses of Queue.LifoQueue and Queue.PriorityQueue.

It's in a pretty preliminary state currently, which is to say that although it passes its test suite, I haven't actually used it for anything yet. Your mileage may vary. I'll keep this answer updated with exciting news.

The CloseableQueue class differs a bit from Glenn's suggestion in that closing the queue will prevent future puts, but not prevent future gets until the queue is emptied. This made the most sense to me; it seemed like functionality to clear the queue could be added as a separate mixin* that would be orthogonal to the closeability functionality. So basically with CloseableQueue, by closing the queue you indicate that the last element has been put. There's also an option to do this atomically by passing last=True to the final put call. Subsequent calls to put, and subsequent calls to get once the queue is emptied, as well as outstanding blocked calls matching those descriptions, will raise the Closed exception.

This is mostly useful for situations where a single producer is generating data for one or more consumers, but it could also be useful for a multi-multi arrangement where consumers are waiting for a particular item or set of items. In particular it doesn't provide a way to determine that all of a number of producers have finished production. Getting that working would entail the provision of some way to register producers (.open()?), as well as a way to indicate that producer registration is itself closed.

Suggestions and/or code reviews are quite welcome. I haven't written a whole lot of concurrency code, but hopefully the test suite is thorough enough that the fact that the code passes it is an indication of the code's quality, rather than the suite's lack thereof. I was able to reuse a bunch of the code from the Queue module's test suite: the file itself is included in this module and used as a basis for various subclasses and routines, including regression testing. This probably (hopefully) helped to avoid complete ineptitude in the testing department. The code itself just overrides Queue.get and Queue.put with fairly minimal changes, and adds the close and closed methods.

I've sort of intentionally avoided using any new-fangled fanciness like context managers in both the code itself and in the test suite in an effort to keep the code as backwards-compatible as is the Queue module itself, which is considerably backwards indeed. I'll probably add __enter__ and __exit__ methods at some point; otherwise, the contextlib's closing function should be applicable to a CloseableQueue instance.

*: Here I use the term "mixin" loosely. As the Queue module's classes are old-style, mixins would need to be mixed using class factory functions; some restrictions apply; offer void where prohibited by Guido.

update

The CloseableQueue module now provides CloseableLifoQueue and CloseablePriorityQueue classes. I've also added some convenience functions to support iteration. Still need to rework it as a proper package. There's a class factory function to allow for convenient subclassing of other Queue.Queue-derived classes.

update 2

CloseableQueue is now available via PyPI, e.g. with

$ easy_install CloseableQueue

Comments and criticism are welcome, especially from this answer's anonymous downvoter.

intuited
To whoever downvoted this: a comment would be great, I'd love to get some criticism of what I've done here.
intuited