views:

103

answers:

2

I'm using the following approach to handle a FIFO queue based on Google App Engine db.Model (see this question).

from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app

class QueueItem(db.Model):
  created = db.DateTimeProperty(required=True, auto_now_add=True)
  data = db.BlobProperty(required=True)

  @staticmethod
  def push(data):
    """Add a new queue item."""
    return QueueItem(data=data).put()

  @staticmethod
  def pop():
    """Pop the oldest item off the queue."""
    def _tx_pop(candidate_key):
      # Try and grab the candidate key for ourselves. This will fail if
      # another task beat us to it.
      task = QueueItem.get(candidate_key)
      if task:
        task.delete()
      return task
    # Grab some tasks and try getting them until we find one that hasn't been
    # taken by someone else ahead of us
    while True:
      candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
      if not candidate_keys:
        # No tasks in queue
        return None
      for candidate_key in candidate_keys:
        task = db.run_in_transaction(_tx_pop, candidate_key)
        if task:
          return task

This queue works as expected (very good).

Right now my code has a method that access this FIFO queue invoked by a deferred queue:

def deferred_worker():
        data= QueueItem.pop()
        do_something_with(data)

I would like to enhance this method and the queue data structure adding a client_ID parameter representing a specific client that needs to access its own Queue. Something like:

def deferred_worker(client_ID):
        data= QueueItem_of_this_client_ID.pop() # I need to implement this
        do_something_with(data)

How could I code the Queue to be client_ID aware?

Constraints:
- The number of clients is dynamic and not predefined
- Taskqueue is not an option (1. ten max queues 2. I would like to have full control on my queue)

Do you know how could I add this behaviour using the new Namespaces api (Remember that I'm not calling the db.Model from a webapp.RequestHandler)?
Another option: I could add a client_ID db.StringProperty to the QueueItem using it has a filter on pull method:

QueueItem.all(keys_only=True).filter(client_ID=an_ID).order('created').fetch(10)

Any better idea?

+1  A: 

Assuming your "client class" is really a request handler the client calls, you could do something like this:

from google.appengine.api import users
from google.appengine.api.namespace_manager import set_namespace

class ClientClass(webapp.RequestHandler):
  def get(self):
    # For this example let's assume the user_id is your unique id.
    # You could just as easily use a parameter you are passed.
    user = users.get_current_user()
    if user:
       # If there is a user, use their queue.  Otherwise the global queue.
       set_namespace(user.user_id())

    item = QueueItem.pop()
    self.response.out.write(str(item))

    QueueItem.push('The next task.')

Alternatively, you could also set the namespace app-wide.

By setting the default namespace all calls to the datastore will be "within" that namespace, unless you explicitly specify otherwise. Just note, to fetch and run tasks you'll have to know the namespace. So you probably want to maintain a list of namespaces in the default namespace for cleanup purposes.

Robert Kluin
@Robert Don't I have to set_namespace(default_namespace) to the default after the QueueItem.pop() statement? Is not possible that a concurrent Task doing other db operation could use the wrong namespace? When you call set_namespace(user.user_id()) is it set globally for all the application or just for the webapp.RequestHandler Thread?
systempuntoout
No. If you set the namespace as suggested, set_namespace will only apply to **that** request. The call to get the namespace is made, as needed, by the code underlying the API.
Robert Kluin
@Robert If you look at Namespaces Api page, there's an example with a finally to restore the saved namespace. Why?
systempuntoout
I assume you are talking about the 'In the Datastore' (http://code.google.com/appengine/docs/python/multitenancy/multitenancy.html#Using_Namespaces_with_the_Datastore) example. They are running the request in a different namespace. They update the counter within the current namespace *and* within the "-global-" namespace. They want to be sure that if the call to update the "-global-" counter fails they get back to the "correct" namespace, otherwise any further API calls will be within the "-global-" namespace.
Robert Kluin
@Robert As I said in the question I'm using the deferred queue and not the web handler and from what I read on the doc, the set_namespace(namespace) sets the namespace for the current HTTP request.
systempuntoout
You deferred the task from something, right? deferred uses task-queues on the back-end. The task-queue should preserve the namespace.
Robert Kluin
+1  A: 

As I said in response to your query on my original answer, you don't need to do anything to make this work with namespaces: the datastore, on which the queue is built, already supports namespaces. Just set the namespace as desired, as described in the docs.

Nick Johnson
@Nick Sorry, I don't get it. When I call set_namespace(..) is it set globally for all the application? Could the scope of this call raise concurrency problem setting a wrong namespace to other concurrent calls?
systempuntoout
It's set globally for the current request. In any case, the Python runtime is single-threaded, so concurrent requests are non-issue.
Nick Johnson