tags:

views:

96

answers:

4

Hello all,

Here's the issue: I have a thread which runs a for-loop reading from a generator, doing some processing on that data, etc.. The generator always has data coming in, so no StopIteration exception is ever raised by it. I would like to stop this thread (cleanly) from the main thread (i.e., exit out of the for-loop which is processing data from the generator). Below is an example of the above scenario, with the correct result, but in the limited sense I'll describe below:

import threading
import time
import random

def add():
    r = random.Random()
    i = 0
    while True:
        sleep_time = r.randint(0, 3)
        time.sleep(sleep_time)
        yield i
        i = i + 1

class Test(object):

    def __init__(self):
        self.func = add
        self.stopped = False

    def stop(self):
        self.stopped = True

    def run(self):
        self.generator = self.func()
        for x in self.generator:
            print x
            if self.stopped is True:
                break
        print 'DONE'


tester = Test()
thread = threading.Thread(target=tester.run)
thread.daemon = True
thread.start()
time.sleep(10)
print 'Stopping thread'
tester.stop()
print 'Complete, but should stop immediately!'

Now, while this works in the above example (obviously the above doesn't prevent race conditions on self.stopped, but that's not the problem at hand so I left that code out), the problem I have is that the generator in my real code does not always have data immediately, so there can be a long pause between when self.stopped is set and the break statement is actually executed. So, the gist of my problem is that I would like to be able to cleanly exit out of the for-loop as soon as possible, rather than waiting for data from the generator before being able to exit, and obviously the above solution does not do that.

Is there any hope? It's a pretty out-there problem, which likely has no clean solution, but any help would be greatly appreciated.

EDIT: To clarify, in my real application I have a generator (let's denote it as G) which grabs data from a kernel driver. This data is to be sent out to a server, but while the socket is attempting to connect to the server (which may not always be running) I want to process the data from the driver (once connected this processing does not occur). So I launched a thread to grab data from G (and process it) while the main thread attempts to connect to the server. Once connected, ideally the following should occur:

I pause the execution of G, exit the thread, and pass the same G instance to another function which sends the data straight to the server.

From the answers/comments below, I believe this is impossible without destroying G, because there is no way to cleanly pause a currently executing generator.

Sorry for the confusion.

A: 

You need the self:generator to have a timeout capability. Conceptually

wait(1 sec);

rather than just

wait();

I don't know if that's possible (show us your generator code). For example if you were reading from a pipe or a socket don't code

giveMeSomeBytes( buffer);  // wait indefinately

code

giveMeSomeBytesOrTimeout( buffer, howLongToWait); // wait for a while and 
                                                  // then go see if we should dies
djna
Having a timeout to exit is almost always a poor solution; finished threads should be cleaned up immediately, without waiting for some arbitrary timeout to expire. This is fine for a quick hack, but not for serious code.
Glenn Maynard
I can't post my generator code, but I changed the above add() function to better emulate how the generator acts
@Glenn. Hmmm, hack? I'll respectfully disagree. There is no clean way to tidy a thread that is waiting indefinately. The thread itself must be awake enough to stop.
djna
That's why you implement a blocking read that can be woken up. Having a thread wait until a timeout before shutting down is not a clean approach.
Glenn Maynard
@Glenn That's what I was trying to say hence the word "conceptually" ;-) I had better be clearer
djna
The point is that you should not be checking periodically to see if you should exit; exiting should send a proactive *signal*, waking up the blocking thread so it exits immediately. Depending on what's blocking, that's generally done with some combination of thread primitives, socket primitives (eg. pipes) or signals. (As the OP won't post what's actually blocking, it's impossible to suggest how to actually do this.)
Glenn Maynard
A: 

Sounds like what you really want is a coroutine, not a generator. See David Beazley's mind-bending A Curious Course on Coroutines and Concurrency, which, while being more information than you require and then some, should give you some clarity on what you're trying to do.

Robert Rossney
Coroutines are useful (though Python doesn't do them too well), but they don't automatically provide a mechanism for exiting from a blocking socket read, or whatever it is that his code is actually waiting on.
Glenn Maynard
Ah, I didn't quite get that there was a blocking read involved. Never mind then.
Robert Rossney
A: 

Couldn't you just 'close' the generator ? Doing something like

def stop(self):
        self.generator.close()

def run(self):
        self.generator = self.func()
        try:
          for x in self.generator:
              print x
              time.sleep(1)
        except GeneratorExit:
          pass
        print 'DONE'
rotoglup
A: 

First, generators are probably a red herring; don't worry about them.

The canonical way to solve this kind producer-consumer problem in Python is using the built-in queue module. It acts as an intermediary, allowing your producer thread to keep grabbing/processing data from the kernel into the queue, and your consumer thread to send queue data to the server, without their respective blocking I/O calls interfering with one another.

Here's a sketch of the basic idea, without the details filled in:

from queue import Queue

class Application(object):

    def __init__(self):
        self.q = Queue()
        self.running = False

    # From kernel to queue
    def produce(self):
        while self.running:
            data = read_from_kernel()
            self.q.put(data)

    # From queue to server
    def consume(self):
        while self.running:
            data = self.q.get()
            send_to_server(data)

    # Start producer thread, then consume
    def run():
        try:
            self.running = True
            producer = Thread(target=self.produce)
            producer.start()
            self.consume()
        finally:
            self.running = False

If self.running is set to False, the above code's produce method will still block inside the read_from_kernel until its next return before exiting itself, but there's little Python can do about that. Whatever system call you use must support this somehow: if it's an actual read, for example, your options would include:

  • A short timeout, plus retry handling
  • Non-blocking I/O (but in this case you might want to investigate a framework based around this, like Twisted Python)
Piet Delport