views:

28

answers:

1

I want to transmit data from a Queue using Twisted. I currently use a push producer to poll the queue for items and write to the transport.

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

The problem is, that the data are sent very irregularly and if only one item was in the queue, the data is never going to be sent. It seems that Twisted waits until the data to be transmitted has grown to a specific value until it transmits it. Is the way I implemented my producer the right way? Can I force Twisted to transmit data now?

I've also tried using a pull producer, but Twisted does not call the resumeProducing() method of it at all. Do I have to call the resumeProducer() method from outside, when using a pull producer?

+1  A: 

It's hard to say why your producer doesn't work well without seeing a complete example (that is, without also seeing the code that registers it with a consumer and the code which is putting items into that queue).

However, one problem you'll likely have is that if your queue is empty when resumeProducing is called, then you will write no bytes at all to the consumer. And when items are put into the queue, they'll sit there forever, because the consumer isn't going to call your resumeProducing method again.

And this generalizes to any other case where the queue does not have enough data in it to cause the consumer to call pauseProducing on your producer. As a push producer, it is your job to continue to produce data on your own until the consumer calls pauseProducing (or stopProducing).

For this particular case, that probably means that whenever you're going to put something in that queue - stop: check to see if the producer is not paused, and if it is not, write it to the consumer instead. Only put items in the queue when the producer is paused.

Jean-Paul Calderone
Isn't this a break in the clean design of a producer if I have to check from outside if it is running before i can give him data to process? In my opinion the processing of the data itself should be transparent from outside.
Manuel Faux
That's a really big question. One answer is that - yes, sure, it is; so you should add a method to your producer for enqueueing data, and that method should inspect the state to decide if it should really go into the queue or if it should be written to the consumer immediately. This keeps all of the logic inside the producer class. However, consider that if all of the users of this producer decide to remain ignorant of whether it is paused, then they may eventually overflow the queue. The **point** of producers is to propagate the low-level buffer-full event so you can avoid overflows.
Jean-Paul Calderone
Ok, thank's for your answer. Seems like this is the only solution, since no other answer was posted. But, I'm still not happy with this solution.
Manuel Faux