views:

745

answers:

2

I am writing a server program with one producer and multiple consumers, what confuses me is only the first task producer put into the queue gets consumed, after which tasks enqueued no longer get consumed, they remain in the queue forever.

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

The producer is a HTTP server which put a task in the queue once receive a request from the user. It seems that consumer processes are still blocked when there are new tasks in the queue, which is weird.

P.S. Another two questions not relating to the above, I am not sure if it's better to put HTTP server in its own process other than the main process, if yes how can I make the main process keep running before all children processes end. Second question, what's the best way to stop the HTTP server gracefully?

Edit: add producer code, it's just a simple python wsgi server:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()
+2  A: 

I think there must be something wrong with the web server part, as this works perfectly:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESS):
            self.workers[i].join()
        queue.close()


Manager().start()

Sample output:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1
Luper Rouch
+2  A: 

"Second question, what's the best way to stop the HTTP server gracefully?"

This is hard.

You have two choices for Interprocess Communication:

  • Out-of-band controls. The server has another mechanism for communication. Another socket, a Unix Signal, or something else. The something else could be a "stop-now" file in the server's local directory. Seems odd, but it does work well and is simpler than introducing a select loop to listen on multiple sockets or a signal handler to catch a Unis signal.

    The "stop-now" file is easy to implement. The evwsgi.run() loop merely checks for this file after each request. To make the server stop, you create the file, execute a /control request (which will get a 500 error or something, it doesn't really matter) and the server should grind to a halt. Remember to delete the stop-now file, otherwise your server won't restart.

  • In-band controls. The server has another URL (/stop) which will stop it. Superficially, this seems like a security nightmare, but it depends entirely on where and how this server will be used. Since it appears to be a simple wrapper around an internal request queue, this extra URL works well.

    To make this work, you need to write your own version of evwsgi.run() that can be terminated by setting some variable in a way that will break out of the loop.

Edit

You probably don't want to terminate your server, since you don't know the state of it's worker threads. You need to signal the server and then you just have to wait until it finishes things normally.

If you want to forcibly kill the server, then os.kill() (or multiprocessing.terminate) will work. Except, of course, you don't know what the child threads were doing.

S.Lott
How about put the server in its own process, and use multiprocessing.Process.terminate method to terminate the process? This seems easier.
btw0