views:

1767

answers:

3

How to run background tasks on appengine ?

+5  A: 

GAE is very useful tool to build scalable web applications. Few of the limitations pointed out by many are no support for background tasks, lack of periodic tasks and strict limit on how much time each HTTP request takes, if a request exceeds that time limit the operation is terminated, which makes running time consuming tasks impossible.

How to run background task ?
In GAE the code is executed only when there is a HTTP request. There is a strict time limit (i think 10secs) on how long the code can take. So if there are no requests then code is not executed. One of the suggested work around was use an external box to send requests continuously, so kind of creating a background task. But for this we need an external box and now we dependent on one more element. The other alternative was sending 302 redirect response so that client re-sends the request, this also makes us dependent on external element which is client. What if that external box is GAE itself ? Everyone who has used functional language which does not support looping construct in the language is aware of the alternative ie recursion is the replacement to loop. So what if we complete part of the computation and do a HTTP GET on the same url with very short time out say 1 second ? This creates a loop(recursion) on php code running on apache.

<?php
$i = 0;
if(isset($_REQUEST["i"])){
        $i= $_REQUEST["i"];
    sleep(1);
}
$ch = curl_init("http://localhost".$_SERVER["PHP_SELF"]."?i=".($i+1));
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_TIMEOUT, 1);
curl_exec($ch);
print "hello world\n";
?>

Some how this does not work on GAE. So what if we do HTTP GET on some other url say url2 which does HTTP GET on the first url ? This seem to work in GAE. Code for this looks like this.

class FirstUrl(webapp.RequestHandler):
    def get(self):
     self.response.out.write("ok")
     time.sleep(2)
     urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url2')

class SecondUrl(webapp.RequestHandler):
    def get(self):
     self.response.out.write("ok")
     time.sleep(2)
     urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url1')

application = webapp.WSGIApplication([('/url1', FirstUrl), ('/url2', SecondUrl)])
def main():
    run_wsgi_app(application)
if __name__ == "__main__":
    main()

Since we found out a way to run background task, lets build abstractions for periodic task (timer) and a looping construct which spans across many HTTP requests (foreach).

Timer
Now building timer is straight forward. Basic idea is to have list of timers and the interval at which each should be called. Once we reach that interval call the callback function. We will use memcache to maintain the timer list. To find out when to call callback, we will store a key in memcache with interval as expiration time. We periodically (say 5secs) check if that key is present, if not present then call the callback and again set that key with interval.

def timer(func, interval):
    timerlist = memcache.get('timer')
    if(None == timerlist):
     timerlist = []
    timerlist.append({'func':func, 'interval':interval})
    memcache.set('timer-'+func, '1', interval)
    memcache.set('timer', timerlist)

def checktimers():
    timerlist = memcache.get('timer')
    if(None == timerlist):
     return False
    for current in timerlist:
     if(None == memcache.get('timer-'+current['func'])):
      #reset interval
      memcache.set('timer-'+current['func'], '1', current['interval'])
      #invoke callback function
      try:
       eval(current['func']+'()')
      except:
       pass
      return True
    return False

Foreach
This is needed when we want to do long taking computation say doing some operation on 1000 database rows or fetch 1000 urls etc. Basic idea is to maintain list of callbacks and arguments in memcache and each time invoke callback with the argument.

def foreach(func, args):
    looplist = memcache.get('foreach')
    if(None == looplist):
     looplist = []
    looplist.append({'func':func, 'args':args})
    memcache.set('foreach', looplist)

def checkloops():
    looplist = memcache.get('foreach')
    if(None == looplist):
     return False
    if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)):
     arg = looplist[0]['args'].pop(0)
     func = looplist[0]['func']
     if(len(looplist[0]['args']) == 0):
      looplist.pop(0)
     if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)):
      memcache.set('foreach', looplist)
     else:
      memcache.delete('foreach')
     try:
      eval(func+'('+repr(arg)+')')
     except:
      pass
     return True
    else:
     return False

# instead of
# foreach index in range(0, 1000):
#   someoperaton(index)
# we will say
# foreach('someoperaton', range(0, 1000))

Now building a program which fetches list of urls every one hour is straight forward. Here is the code.

def getone(url):
    try:
     result = urlfetch.fetch(url)
     if(result.status_code == 200):
      memcache.set(url, '1', 60*60)
      #process result.content
    except :
     pass

def getallurl():
    #list of urls to be fetched
    urllist = ['http://www.google.com/', 'http://www.cnn.com/', 'http://www.yahoo.com', 'http://news.google.com']
    fetchlist = []
    for url in urllist:
     if (memcache.get(url) is None):
      fetchlist.append(url)
    #this is equivalent to
    #for url in fetchlist: getone(url)
    if(len(fetchlist) > 0):
     foreach('getone', fetchlist)

#register the timer callback
timer('getallurl', 3*60)

complete code is here http://groups.google.com/group/httpmr-discuss/t/1648611a54c01aa I have been running this code on appengine for few days without much problem.

Warning: We make heavy use of urlfetch. The limit on no of urlfetch per day is 160000. So be careful not to reach that limit.

awesome! i like it
fuentesjr
I don't see how it can possibly work. Won't you exceed 10 second quota on 6th recursive fetch?
Constantin
Correct me if I am wrong, isn't there an AppEngine policy about inter-communication between hosted apps?
tranced_UT3
Use the new Task API or the Cron API. do not use the ones above.
Kinlan
+2  A: 

Up and coming version of runtime will have some kind of periodic execution engine a'la cron. See this message on AppEngine group.

So, all the SDK pieces appear to work, but my testing indicates this isn't running on the production servers yet-- I set up an "every 1 minutes" cron that logs when it runs, and it hasn't been called yet

Hard to say when this will be available, though...

zgoda
+6  A: 

You may use The Task Queue Python API. Google Documentation Here. Be cautious since it is currently released as an experimental feature subject to change.

Jason Rikard