views:

271

answers:

5

I have a python app that uses multiple threads and I am curious about the best way to wait for something in python without burning cpu or locking the GIL.

my app uses twisted and I spawn a thread to run a long operation so I do not stomp on the reactor thread. This long operation also spawns some threads using twisted's deferToThread to do something else, and the original thread wants to wait for the results from the defereds.

What I have been doing is this

while self._waiting:
    time.sleep( 0.01 )

which seemed to disrupt twisted PB's objects from receiving messages so I thought sleep was locking the GIL. Further investigation by the posters below revealed however that it does not.

There are better ways to wait on threads without blocking the reactor thread or python posted below.

+5  A: 

Have you tried condition variables? They are used like

condition = Condition()

def consumer_in_thread_A():
    condition.acquire()
    try:
        while resource_not_yet_available:
            condition.wait()
        # Here, the resource is available and may be 
        # consumed
    finally:
        condition.release()

def produce_in_thread_B():
    # ... create resource, whatsoever
    condition.acquire()
    try:
        condition.notify_all()
    finally:
        condition.release()

Condition variables act as locks (acquire and release), but their main purpose is to provide the control mechanism which allows to wait for them to be notify-d or notify_all-d.

Dirk
conditionals should work nicely if I can work them into my program. Its a little more complicated then a 1 line change though =P
Charles
+1  A: 

The threading module allows you to spawn a thread, which is then represented by a Thread object. That object has a join method that you can use to wait for the subthread to complete.

See http://docs.python.org/library/threading.html#module-threading

Brian Clapper
normally this would work nicely but the sub threads I am spawning are actually twisted's deferToThread calls so I do not get a Thread object back. Technically I can wait on the defers but that is a little messy.
Charles
The whole idea behind Deferreds is that you DON'T wait for them. Instead, you just add a callback that fires when the deferred operation completes. Can't you just organize your application that way, instead?
Brian Clapper
Well yes I defer the action to a thread and give twisted a callback. But the parent thread needs to wait for the whole series of sub threads to complete before it can move onto a new set of sub threads to spawn.
Charles
The deferred callback could simply count the number of threads that have called it back, and when the number reaches the total number originally spawned, the callback then knows it can move on to the second set.
Brian Clapper
+2  A: 

According to the Python source, time.sleep() does not hold the GIL.

http://code.python.org/hg/trunk/file/98e56689c59c/Modules/timemodule.c#l920

Note the use of Py_BEGIN_ALLOW_THREADS and Py_END_ALLOW_THREADS, as documented here:

http://docs.python.org/c-api/init.html#thread-state-and-the-global-interpreter-lock

Forest
Yea I noticed with some simple tests time.sleep seemed to work.. but for some reason twisted'd pb tools do not seem to get messages when a thread is sleeping.. Maybe there is something else wrong here.. thanks
Charles
+4  A: 

I recently found out that calling time.sleep( X ) will lock the GIL for the entire time X and therefore freeze ALL python threads for that time period.

You found wrongly -- this is definitely not how it works. What's the source where you found this mis-information?

Anyway, then you clarify (in comments -- better edit your Q!) that you're using deferToThread and your problem with this is that...:

Well yes I defer the action to a thread and give twisted a callback. But the parent thread needs to wait for the whole series of sub threads to complete before it can move onto a new set of sub threads to spawn

So use as the callback a method of an object with a counter -- start it at 0, increment it by one every time you're deferring-to-thread and decrement it by one in the callback method.

When the callback method sees that the decremented counter has gone back to 0, it knows that we're done waiting "for the whole series of sub threads to complete" and then the time has come to "move on to a new set of sub threads to spawn", and thus, in that case only, calls the "spawn a new set of sub threads" function or method -- it's that easy!

E.g. (net of typos &c as this is untested code, just to give you the idea)...:

class Waiter(object):

  def __init__(self, what_next, *a, **k):
    self.counter = 0
    self.what_next = what_next
    self.a = a
    self.k = k

  def one_more(self):
    self.counter += 1

  def do_wait(self, *dont_care):
    self.counter -= 1
    if self.counter == 0:
    self.what_next(*self.a, **self.k)


def spawn_one_thread(waiter, long_calculation, *a, **k):
  waiter.one_more()
  d = threads.deferToThread(long_calculation, *a, **k)
  d.addCallback(waiter.do_wait)

def spawn_all(waiter, list_of_lists_of_functions_args_and_kwds):
  if not list_of_lists_of_functions_args_and_kwds:
    return
  if waiter is None:
    waiter=Waiter(spawn_all, list_of_lists_of_functions_args_and_kwds)
  this_time = list_of_list_of_functions_args_and_kwds.pop(0)
  for f, a, k in this_time:
    spawn_one_thread(waiter, f, *a, **k)

def start_it_all(list_of_lists_of_functions_args_and_kwds):
  spawn_all(None, list_of_lists_of_functions_args_and_kwds)
Alex Martelli
this is neat, I have been using a signaling system but this looks like it might be cleaner.
Charles
+10  A: 

If you're already using Twisted, you should never need to "wait" like this.

As you've described it:

I spawn a thread to run a long operation ... This long operation also spawns some threads using twisted's deferToThread ...

That implies that you're calling deferToThread from your "long operation" thread, not from your main thread (the one where reactor.run() is running). As Jean-Paul Calderone already noted in a comment, you can only call Twisted APIs (such as deferToThread) from the main reactor thread.

The lock-up that you're seeing is a common symptom of not following this rule. It has nothing to do with the GIL, and everything to do with the fact that you have put Twisted's reactor into a broken state.

Based on your loose description of your program, I've tried to write a sample program that does what you're talking about based entirely on Twisted APIs, spawning all threads via Twisted and controlling them all from the main reactor thread.

import time

from twisted.internet import reactor
from twisted.internet.defer import gatherResults
from twisted.internet.threads import deferToThread, blockingCallFromThread

def workReallyHard():
    "'Work' function, invoked in a thread."
    time.sleep(0.2)

def longOperation():
    for x in range(10):
        workReallyHard()
        blockingCallFromThread(reactor, startShortOperation, x)
    result = blockingCallFromThread(reactor, gatherResults, shortOperations)
    return 'hooray', result

def shortOperation(value):
    workReallyHard()
    return value * 100

shortOperations = []

def startShortOperation(value):
    def done(result):
        print 'Short operation complete!', result
        return result
    shortOperations.append(
        deferToThread(shortOperation, value).addCallback(done))

d = deferToThread(longOperation)
def allDone(result):
    print 'Long operation complete!', result
    reactor.stop()
d.addCallback(allDone)

reactor.run()

Note that at the point in allDone where the reactor is stopped, you could fire off another "long operation" and have it start the process all over again.

Glyph
"If you're already using Twisted, you should never need to "wait" like this.". I cant believe nobody mentioned this till now.
jeffjose
@Glyph This works well but with a bit of experimentation I found that it is a little more involved then I would like. Seems the operations called using reactor threads block the reactor event loop. I used callInThread and callLater for a simple test. If my callInThread method sleeps for 30 seconds and I have my callLater firing at 10 seconds, I will not get the callLater until AFTER callInThread is complete. Is it possible to have a reactor-safe thread block? Or should I start ripping things apart?
Charles
Charles, I'm sorry, but I don't know what you're talking about. callInThread will call your function in a thread, and not block the reactor mainloop; that's the whole point. I'd need to see a complete, working example of the code you're talking about to be more helpful.
Glyph
@Glyph sorry, I really wish we could format code in comments. But no matter, I ripped apart what i was doing and threw in a quick and dirty callback design and it appears to be much more stable. I have some cleaning up to do but I think the project will be better for it :)
Charles