views:

80

answers:

3

I have a client connected to a server using twisted. The client has a thread which might potentially be doing things in the background. When the reactor is shutting down, I have to:

1) check if the thread is doing things
2) stop it if it is

What's an elegant way to do this? The best I can do is some confused thing like:

def cleanup(self):
    isWorkingDF = defer.Deferred()
    doneDF = defer.Deferred()

    def checkIsWorking():
        res = self.stuff.isWorking() #blocking call
        reactor.callFromThread(isWorkingDF.callback, res)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            def shutdown():
                self.stuff.shutdown()
                reactor.callFromThread(doneDF, None)
            reactor.callInThread(shutdown)                
        else:
            doneDF.callback(None) #no shutdown needed

    isWorkingDF.addCallback(shutdownOrNot)

    reactor.callInThread(checkIsWorking)

    return doneDF

First we check if it's working at all. The result of that callback goes into rescallback which either shuts down or doesn't, and then fires the doneDF, which twisted waits for until closing.

Pretty messed up eh! Is there a better way?

Maybe a related question is, is there a more elegant way to chain callbacks to each other? I could see myself needing to do more cleanup code after this is done, so then I'd have to make a different done deferred, and have the current doneDF fire a callback which does stuff then calls that done deferred..

A: 

If the program is terminating after you shut down the reactor you could make the thread a daemon thread. This will automatically exit when all the non-daemon threads terminate. Just set daemon = True on the thread object before you call start().

If this is not viable, e.g. the thread has to do resource cleanup before it exits then you could communicate between the reactor and the thread with a Queue. Push work to be done onto a Queue object and have the thread pull it off and do it. Have a special "FINISH" token (or simply None) to indicate that the thread needs to terminate.

Dave Kirby
i like the 2nd idea, but i'd still have to communicate with the thread to both figure out whether it's on and then put something on the queue if it is. it doesn't solve the basic problem that those would be blocking calls. i'm practically doing this anyway, except with a single lock-guarded variable instead of a queue. the 1st idea might actually work but I'm not sure if that's exactly the logic i want
Claudiu
+3  A: 

You can simplify this somewhat by using deferToThread instead of the callInThread/callFromThread pairs:

from twisted.internet.threads import deferToThread

def cleanup(self):
    isWorkingDF = deferToThread(self.stuff.isWorking)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            return deferToThread(self.stuff.shutdown)

    isWorkingDF.addCallback(shutdownOrNot)

    return isWorkingDF

deferToThread is basically just a nice wrapper around the same threading logic you had implemented twice in your version of the function.

Jean-Paul Calderone
ah this might be what I want, yes. i think you added another element here, too - if you return a deferred from a callback called from another deferred, what happens? i wouldn't think that deferred would also be waited for but maybe i'm wrong?
Claudiu
well i still needed more deferreds but defertothread made the code much nicer
Claudiu
If you return a Deferred (x) from a callback on another Deferred (y), then y will suspend processing its callback chain until x has a result. Then y will continue where it left off using the result of x as the result for its next callback.
Jean-Paul Calderone
yup this is just what I wanted. wish I could upvote again
Claudiu
A: 

Ah the real answer is to use the defer.inlineCallbacks decorator. The above code now becomes:

@defer.inlineCallbacks
def procShutdownStuff(self):
    isWorking = yield deferToThread(self.stuff.isWorking)

    if isWorking:
        yield deferToThread(self.stuff.shutdown)

def cleanup(self):
    return self.procShutdownStuff()
Claudiu