views:

932

answers:

3

Suppose you're running Django on Linux, and you've got a view, and you want that view to return the data from a subprocess called cmd that operates on a file that the view creates, for example likeso:

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

Now, suppose cmd has a very slow start-up time, but a very fast operating time, and it does not natively have a daemon mode. I would like to improve the response-time of this view.

I would like to make the whole system would run much faster by starting up a number of instances of cmd in a worker-pool, have them wait for input, and having *call_process* ask one of those worker pool processes handle the data.

This is really 2 parts:

Part 1. A function that calls cmd and cmd waits for input. This could be done with pipes, i.e.

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

Part 2. A set of workers running in the background that are waiting on the data. i.e. We want to extend the above so that the subprocess is already running, e.g. when the Django instance initializes, or this *call_process* is first called, a set of these workers is created

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index

There should be some initialization of workers somewhere, like this:

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

Now, what I have above becomes something likeso:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

Now, the questions:

  1. Will this work? (I've just typed this off the top of my head into StackOverflow, so I'm sure there are problems, but conceptually, will it work)

  2. What are the problems to look for?

  3. Are there better alternatives to this? e.g. Could threads work just as well (it's Debian Lenny Linux)? Are there any libraries that handle parallel process worker-pools like this?

  4. Are there interactions with Django that I ought to be conscious of?

Thanks for reading! I hope you find this as interesting a problem as I do.

Brian

A: 

How about "daemonizing" the subprocess call using python-daemon or its successor, grizzled.

John Paulett
unrelated but, what does it do better? I've had some problems related to python-daemon, but I'm inherently skeptical towards collection libraries.
asksol
+3  A: 

It may seem like i am punting this product as this is the second time i have responded with a recommendation of this.

But it seems like you need a Message Queing service, in particular a distributed message queue.

ere is how it will work:

  1. Your Django App requests CMD
  2. CMD gets added to a queue
  3. CMD gets pushed to several works
  4. It is executed and results returned upstream

Most of this code exists, and you dont have to go about building your own system.

Have a look at Celery which was initially built with Django.

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/

izzy
This is interesting - I'll be looking into it. However, the problem I may (or may not) have is that part 1 of step #4 ("it is executed", i.e. LaTeX is started) must happen before #2 ("CMD gets added to queue", i.e. LaTeX gets data). However, I'm pretty confident that Celery can do this- but it'll require a bit of delving.
Brian M. Hunt
+2  A: 

Issy already mentioned Celery, but since comments doesn't work well with code samples, I'll reply as an answer instead.

You should try to use Celery synchronously with the AMQP result store. You could distribute the actual execution to another process or even another machine. Executing synchronously in celery is easy, e.g.:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

The AMQP result store makes sending back the result very fast, but it's only available in the current development version (in code-freeze to become 0.8.0)

asksol
Thanks Asksol. One requirement is to have the Task run forever as a daemon, and then just send/receive data from it. LaTeX has to be running before you call run() (otherwise you have to wait for LaTeX to start up, which obviates the entire purpose of using a task queue). I'm looking into Celery to see if it can do this (I expect it can).
Brian M. Hunt
I don't see the requirement that LaTeX has to be running, apart from being an optimization? To do this you would have to use the LaTeX C API (or whatever it is) to run it embedded inside a worker process. This should be possible, but would require you to customize celery considerably. It might be a good starting point as it solves parts of your problem already. I was not saying the task queue is a good fit for this, but the distributed/parallel processing part *might* be. You want a Task pool and you want to send/receive results, you just want the worker processes to be LaTeX processors.
asksol