views:

71

answers:

1

I've setup this task queue implementation on a site I host for a customer, it has a cron job which runs each morning at 2am "/admin/tasks/queue", this queues up emails to be sent out, "/admin/tasks/email", and uses cursors so as to do the queuing in small chunks. For some reason last night /admin/tasks/queue kept getting run by this code and so sent out my whole quota of emails :/. Have I done something wrong with this code?

class QueueUpEmail(webapp.RequestHandler):
    def post(self):
        subscribers = Subscriber.all()
        subscribers.filter("verified =", True)

        last_cursor = memcache.get('daily_email_cursor')
        if last_cursor:
            subscribers.with_cursor(last_cursor)

        subs = subscribers.fetch(10)
        logging.debug("POST - subs count = %i" % len(subs))
        if len(subs) < 10:
            logging.debug("POST - Less than 10 subscribers in subs")
            # Subscribers left is less than 10, don't reschedule the task
            for sub in subs:
                task = taskqueue.Task(url='/admin/tasks/email', params={'email': sub.emailaddress, 'day': sub.day_no})
                task.add("email")
            memcache.delete('daily_email_cursor')
        else:
            logging.debug("POST - Greater than 10 subscibers left in subs - reschedule")
            # Subscribers is 10 or greater, reschedule
            for sub in subs:
                task = taskqueue.Task(url='/admin/tasks/email', params={'email': sub.emailaddress, 'day': sub.day_no})
                task.add("email")
            cursor = subscribers.cursor()
            memcache.set('daily_email_cursor', cursor)
            task = taskqueue.Task(url="/admin/tasks/queue", params={})
            task.add("queueup")
+2  A: 

I can see a couple of potential problems. First, you store your cursor in memcache, which is not guaranteed to persist anything. If you get a cache miss halfway through your processing, you'll re-send every message again.

Secondly, tasks will get re-tried if they fail for any reason; they're supposed to be designed to be idempotent for this reason. In the case of sending emails, of course, this is nearly impossible, since once a message is sent it can't be rolled back if your task dies for some other reason after sending it. At a minimum, I'd recommend trying to update a "last emailed date" field on each Subscriber entity after sending them the message. This in itself isn't foolproof, of course, since the email send could succeed and the update of the entity could fail after that. It would also add overhead to the whole process, since you'd be doing a write for each subscriber.

Wooble
Thanks for your analysis, my initial thought was memcache could be an issue.
Peter Farmer
Memcache was probably the proximate cause behind the issue you had, yes. Your best approach would be to pass the cursor as an argument from each task to the next.
Nick Johnson