views:

409

answers:

2

I'm trying to figure out how to write a program in python that uses the multiprocessing queue.

I have multiple servers and one of them will provide the queue remotely with this:

from multiprocessing.managers import BaseManager
import Queue
import daemonme

queue = Queue.Queue()

class QueueManager(BaseManager):
    pass

daemonme.createDaemon()
QueueManager.register('get_job', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='')
s = m.get_server()
s.serve_forever()

Now I want to use my dual Xeon, quad core server to process jobs off of this remote queue. The jobs are totally independent of one another. So if I have 8 cores, I'd like to start 7 processes that pick a job off the queue, process it, then go back for the next one. Each of the 7 processes will do this, but I can't quite get my head wrapped around the structure of this program.

Can anyone provide me some educated ideas about the basic structure of this?

Thank you in advance.

A: 

You should use the master-slave (aka. farmer-worker) pattern. The initial process would be the master and creates the jobs. It

  1. creates a Queue
  2. creates 7 slave processes, passing the queue as a parameter
  3. starts writing jobs into the queue

The slave processes continuously read from the queue, and perform the jobs (perhaps until they receive a stop message from the queue). There is no need to use Manager objects in this scenario, AFAICT.

Martin v. Löwis
How you implementation would manage *remote* queues? I think multiprocessing.managers is a really good choice if he needs to share resources remotely.
AlberT
+2  A: 

Look to the doc how to retreive a queue from the manager (paragraph 17.6.2.7) than with a pool (paragraph 17.6.2.9) of workers launch 7 jobs passing the queue to each one.

in alternative you can think something like a producer/consumer problem:

from multiprocessing.managers import BaseManager
import random

class Producer():
def __init__(self):
    BaseManager.register('queue')
    self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.cm_queue = self.m.queue()
    while 1:
        time.sleep(random.randint(1,3))
        self.cm_queue.put(<PUT-HERE-JOBS>)

from multiprocessing.managers import BaseManager
import time
import random
class Consumer():
def __init__(self):
    BaseManager.register('queue')

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.queue = self.m.queue()
    while 1:
        <EXECUTE(job = self.queue.get())>


from multiprocessing.managers import BaseManager, Queue
class Manager():

def __init__(self):

    self.queue = QueueQueu()

    BaseManager.register('st_queue', callable=lambda:self.queue)

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.s = self.m.get_server()

    self.s.serve_forever()
DrFalk3n
I have this working (thank you). What I need to know is in your section <EXECUTE(job = self.queue.get())> what is the best way to process these jobs? They're all python files so would it be best to run them as a module? Or should they be run under a separate python process with the subprocess module?
WeWatchYourWebsite
look to the OS module and to its exec method for example
DrFalk3n