views:

123

answers:

2

I need to write a simple app that runs two threads: - thread 1: runs at timed periods, let's say every 1 minute - thread 2: just a 'normal' while True loop that does 'stuff'

if not the requirement to run at timed interval I would have not looked at twisted at all, but simple sleep(60) is not good enough and construction like:

l = task.LoopingCall(timed_thread)
l.start(60.0)
reactor.run()

Looked really simple to achieve what I wanted there.

Now, how do I 'properly' add another thread?

I see two options here:

  • Use threading library and run two 'python threads' one executing my while loop, and another running reactor.run(). But Google seems to object this approach and suggests using twisted threading
  • Use twisted threading. That's what I've tried, but somehow this looks bit clumsy to me.

Here's what I came up with:

def timed_thread():
    print 'i will be called every 1 minute'
    return

def normal_thread():
    print 'this is a normal thread'
    time.sleep(30)
    return

l = task.LoopingCall(timed_thread)
l.start(60.0)
reactor.callInThread(normal_thread)
reactor.run()

That seems to work, but! I can't stop the app. If I press ^C it wouldn't do anything (without 'callInThread' it just stops as you'd expect it to). ^Z bombs out to shell, and if I then do 'kill %1' it seems to kill the process (shell reports that), but the 'normal' thread keeps on running. kill PID wouldn't get rid of it, and the only cure is kill -9. Really strange.

So. What am I doing wrong? Is it a correct approach to implement two threads in twisted? Should I not bother with twisted? What other 'standard' alternatives are to implement timed calls? ('Standard' I mean I can easy_install or yum install them, I don't want to start downloading and using some random scripts from random web pages).

+2  A: 

Assuming that your main is relatively non-blocking:

import random
from twisted.internet import task

class MyProcess:
  def __init__(self):
    self.stats = []
    self.lp = None
  def myloopingCall(self):
    print "I have %s stats" % len(self.stats)
  def myMainFunction(self,reactor):
    self.stats.append(random.random())
    reactor.callLater(0,self.myMainFunction,reactor)
  def start(self,reactor):
    self.lp = task.LoopingCall(self.myloopingCall)
    self.lp.start(2)
    reactor.callLater(0,self.myMainFunction,reactor)
  def stop(self):
    if self.lp is not None:
      self.lp.stop()
    print "I'm done"

if __name__ == '__main__':
  myproc = MyProcess()
  from twisted.internet import reactor
  reactor.callWhenRunning(myproc.start,reactor)
  reactor.addSystemEventTrigger('during','shutdown',myproc.stop)
  reactor.callLater(10,reactor.stop)
  reactor.run()
$ python bleh.py
I have 0 stats
I have 33375 stats
I have 66786 stats
I have 100254 stats
I have 133625 stats
I'm done
MattH
This doesn't use threads.
MattH
you mean this doesn't use python threads. i suspect it's still threaded by the twisted lib?
pulegium
Nope, this is singled threaded. Twisted only uses threads if you tell it to. Have a read of http://twistedmatrix.com/documents/current/core/howto/threading.html
MattH
ah i see now... hhmm... so how to make this 'truly' threaded app??? your example kind of does what's needed, but is restricted by time, ie main loop in some cases may take more than a minute to execute and i don't want two of them running at the same time....
pulegium
`task.loopingCall` won't run twice if another function is blocking when it "would" be called. It will be called when there is the opportunity (i.e. the blocking call has returned) and it's next call will be rescheduled according to when it was actually called.
MattH
If this form of scheduling isn't suitable for you purposes, then it *might* be possible using "real" threads with twisted, but that's more complicated and depends on exactly what you're doing. Refer to the howto in my previous comment.
MattH
hmm... well i think i still need to have possibility to run more than one thread. basically, looping thread generates data on a timely basis, every minute for example. that doesn't need to be super precise, but should not deviate more than 1 sec. other threads operate on that data, and currently i need only 1, but might require more in the future, so need to be able to run more than one 'main' thread, that was basically my original question... :)
pulegium
You could use `callInThread` to run each loop of your main threads (plural), and collate the results within the parent process, as long as the data is only passed in and passed out. Then have a 60 second scheduled call that reports on the collated data. You'll have 'aliasing' problems when the "main" calls take longer than a minute. Essentially, what you're actually asking is pretty complicated and fraught with pitfalls. Your original question doesn't reflect to me what you're asking now, and I should have probably asked you for more clarification before trying to help.
MattH
well look at the example that i posted in my question, it does have `callInThread` in it, but I then have trouble stopping it.. all 'threads' are supposed to be infinite, i'm planning to run this app as a daemon process, and don't mind SIGHUP'ing it, but atm with twisted it's just SIGKIL that works. The only option I see atm is to write 3 threads, 1) timer thread. change state from 0 to 1 and back to 0 every 1 sec, loops constantly 2) timed thread. watches for state change and react appropriately 3) other 'normal' threads. Bit ugly, I though python would already have smth like that built-in.
pulegium
In your example your "thread" methods just print. I had no idea whether your "real" threads are going to stomp all over each other, so I've tried provide a solution that won't.
MattH
the real threads are completely separate from implementation pov. timed thread polls data from a source and inserts it into a DB. that need to happen every minute, doesn't really matter when in the minute, but every minute. and no skips are allowed (when possible of course). the other thread(s) read the data from DB, and process it. processing might take a while, but not time constrained. t1 might produce 100 records in 1 minute, but only 1 or even 0 in another. so the load on t2 is undefined, bu that's ok, it can take as long as it needs. it's important though to run t1 every minute.
pulegium
+2  A: 

You didn't explain why you actually need threads here. If you had, I might have been able to explain why you don't need them. ;)

That aside, I can confirm that your basic understanding of things is correct. One possible misunderstanding I can clear up, though, is the notion that "python threads" and "Twisted threads" are at all different from each other. They're not. Python provides a threading library. All of Twisted's thread APIs are implemented in terms of Python's threading library. Only the API is different.

As far as shutdown goes, you have two options.

  • Start your run-forever thread using Python's threading APIs directly and make the thread a daemon. Your process can exit even while daemon threads are still running. A possible problem with this solution is that some versions of Python have issues with daemon threads that will lead to a crash at shutdown time.
  • Create your thread using either Twisted's APIs or the stdlib threading APIs but also add a Twisted shutdown hook using reactor.addSystemEventTrigger('before', 'shutdown', f). In that hook, communicate with the work thread and tell it to shut down. For example, you could share a threading.Event between the Twisted thread and your work thread and have the hook set it. The work thread can periodically check to see if it has been set and exit when it notices that it has been. Aside from not crashing, this gives another advantage over daemon threads - it will let you run some cleanup or finalization code in your work thread before the process exits.
Jean-Paul Calderone
+1 Hi JP, thanks for the assist. I don't think I'm up to the task of a twisted proponent.
MattH
np Matt. :) I think your answer was pretty good, except for dealing with the mysterious cases which "may take more than a minute" to complete. If we knew what those cases were, we might be able to suggest a way to adapt your solution to deal with them and actually eliminate the use of threads.
Jean-Paul Calderone
see the description in Matt's answer comment. hard to explain, there are various cases that i need to deal with. take this as an example (which is quite close to reality): t1 reads even entries from let's say 1mil calendars and puts events that scheduled to happen that minute into a DB table. that's it. t2 (t3,4,...) crawls through the table and performs instructions from it. there's no req that event needs to be processed that exact minute, it's just that it needs to get into the queue at that precise moment in time. so t2(3,4,...) have all the time in the world, but t1 is constrained.
pulegium
sorry if that sounds vague, but that's what i have to deal with :)
pulegium
oh.. why timed and within a minutes interval.. because 'calendars' are updated constantly and if I don't pull the data off them, it'll get lost. It's just a strange way of implementing queues, but it's out of my control i'm afraid. why not more often? because the data is updated in there exactly at minutes interval and I have 2 choices: have flag read_it (for a mil or so of them - really ugly) or wait a minute and rely on fact that it's going to be new data. reading might take longer than a minute, but int that case i'll just miss a beat - not good, but not catastrophic either.
pulegium
Awesome, now we're getting somewhere. How do you pull data off the 'calendars'? Are you seriously saying 1 million fetches per minute, every minute? I think you'll be hard-pressed to get that performance without splitting the task between machines.
MattH