How to run background tasks on appengine ?
GAE is very useful tool to build scalable web applications. Few of the limitations pointed out by many are no support for background tasks, lack of periodic tasks and strict limit on how much time each HTTP request takes, if a request exceeds that time limit the operation is terminated, which makes running time consuming tasks impossible.
How to run background task ?
In GAE the code is executed only when there is a HTTP request. There is a strict time limit (i think 10secs) on how long the code can take. So if there are no requests then code is not executed. One of the suggested work around was use an external box to send requests continuously, so kind of creating a background task. But for this we need an external box and now we dependent on one more element. The other alternative was sending 302 redirect response so that client re-sends the request, this also makes us dependent on external element which is client. What if that external box is GAE itself ? Everyone who has used functional language which does not support looping construct in the language is aware of the alternative ie recursion is the replacement to loop. So what if we complete part of the computation and do a HTTP GET on the same url with very short time out say 1 second ? This creates a loop(recursion) on php code running on apache.
<?php $i = 0; if(isset($_REQUEST["i"])){ $i= $_REQUEST["i"]; sleep(1); } $ch = curl_init("http://localhost".$_SERVER["PHP_SELF"]."?i=".($i+1)); curl_setopt($ch, CURLOPT_HEADER, 0); curl_setopt($ch, CURLOPT_TIMEOUT, 1); curl_exec($ch); print "hello world\n"; ?>
Some how this does not work on GAE. So what if we do HTTP GET on some other url say url2 which does HTTP GET on the first url ? This seem to work in GAE. Code for this looks like this.
class FirstUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url2') class SecondUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url1') application = webapp.WSGIApplication([('/url1', FirstUrl), ('/url2', SecondUrl)]) def main(): run_wsgi_app(application) if __name__ == "__main__": main()
Since we found out a way to run background task, lets build abstractions for periodic task (timer) and a looping construct which spans across many HTTP requests (foreach).
Timer
Now building timer is straight forward. Basic idea is to have list of timers and the interval at which each should be called. Once we reach that interval call the callback function. We will use memcache to maintain the timer list. To find out when to call callback, we will store a key in memcache with interval as expiration time. We periodically (say 5secs) check if that key is present, if not present then call the callback and again set that key with interval.
def timer(func, interval): timerlist = memcache.get('timer') if(None == timerlist): timerlist = [] timerlist.append({'func':func, 'interval':interval}) memcache.set('timer-'+func, '1', interval) memcache.set('timer', timerlist) def checktimers(): timerlist = memcache.get('timer') if(None == timerlist): return False for current in timerlist: if(None == memcache.get('timer-'+current['func'])): #reset interval memcache.set('timer-'+current['func'], '1', current['interval']) #invoke callback function try: eval(current['func']+'()') except: pass return True return False
Foreach
This is needed when we want to do long taking computation say doing some operation on 1000 database rows or fetch 1000 urls etc. Basic idea is to maintain list of callbacks and arguments in memcache and each time invoke callback with the argument.
def foreach(func, args): looplist = memcache.get('foreach') if(None == looplist): looplist = [] looplist.append({'func':func, 'args':args}) memcache.set('foreach', looplist) def checkloops(): looplist = memcache.get('foreach') if(None == looplist): return False if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): arg = looplist[0]['args'].pop(0) func = looplist[0]['func'] if(len(looplist[0]['args']) == 0): looplist.pop(0) if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): memcache.set('foreach', looplist) else: memcache.delete('foreach') try: eval(func+'('+repr(arg)+')') except: pass return True else: return False # instead of # foreach index in range(0, 1000): # someoperaton(index) # we will say # foreach('someoperaton', range(0, 1000))
Now building a program which fetches list of urls every one hour is straight forward. Here is the code.
def getone(url): try: result = urlfetch.fetch(url) if(result.status_code == 200): memcache.set(url, '1', 60*60) #process result.content except : pass def getallurl(): #list of urls to be fetched urllist = ['http://www.google.com/', 'http://www.cnn.com/', 'http://www.yahoo.com', 'http://news.google.com'] fetchlist = [] for url in urllist: if (memcache.get(url) is None): fetchlist.append(url) #this is equivalent to #for url in fetchlist: getone(url) if(len(fetchlist) > 0): foreach('getone', fetchlist) #register the timer callback timer('getallurl', 3*60)
complete code is here http://groups.google.com/group/httpmr-discuss/t/1648611a54c01aa I have been running this code on appengine for few days without much problem.
Warning: We make heavy use of urlfetch. The limit on no of urlfetch per day is 160000. So be careful not to reach that limit.
Up and coming version of runtime will have some kind of periodic execution engine a'la cron. See this message on AppEngine group.
So, all the SDK pieces appear to work, but my testing indicates this isn't running on the production servers yet-- I set up an "every 1 minutes" cron that logs when it runs, and it hasn't been called yet
Hard to say when this will be available, though...
You may use The Task Queue Python API. Google Documentation Here. Be cautious since it is currently released as an experimental feature subject to change.