views:

361

answers:

3

I have a question concerning Python multiprocessing. I am trying to take a dataset, break into chunks, and pass those chunks to concurrently running processes. I need to transform large tables of data using simple calculations (eg. electrical resistance -> temperature for a thermistor).

The code listed below almost works as desired, but it doesn't seem to be spawning any new processes (or if so only one at a time). I am new to Python, so there is probably quite a simple solution to this problem.

Thanks in advance!

from multiprocessing import Process

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = map(self.process, self.data)
        super(Worker, self).__init__()

if __name__ == '__main__':
    start = datetime.datetime.now()
    dataset = range(10000) # null dataset
    processes = 3

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))

        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        tmp = dataset[i * chunk : (i + 1) * chunk + remainder]
        exec('worker'+str(i)+' = Worker(tmp)')
        exec('worker'+str(i)+'.start()')

    for i in range(processes):
        exec('worker'+str(i)+'.join()')
        # just a placeholder to make sure the initial values of the set are as expected
        exec('print worker'+str(i)+'.result[0]')
+1  A: 

You haven't overridden the run method. There are two ways with processes (or threads) to have it execute code:

  1. Create a process specifying target
  2. Subclass the process, overriding the run method.

Overriding __init__ just means your process is all dressed up with nowhere to go. It should be used to give it attributes that it needs to perform what it needs to perform, but it shouldn't specify the task to be performed.

In your code, all the heavy lifting is done in this line:

exec('worker'+str(i)+' = Worker(tmp)')

and nothing is done here:

exec('worker'+str(i)+'.start()')

So checking the results with exec('print worker'+str(i)+'.result[0]') should give you something meaningful, but only because the code you want to be executed has been executed, but on process construction, not on process start.

Try this:

class Worker(Process):
# example data transform
def process(self, x): return (x * 2) / 3

def __init__(self, list):
    self.data = list
    self.result = []
    super(Worker, self).__init__()

def run(self):
    self.result = map(self.process, self.data)

EDIT

Okay...so I was just flying based on my threading instincts here, and they were all wrong. What we both didn't understand about processes is that you can't directly share variables. Whatever you pass to a new process to start is read, copied, and gone forever. Unless you use one of the two standard ways to share data: queues and pipes. I've played around a little bit trying to get your code to work, but so far no luck. I think that will put you on the right track.

David Berger
Thanks for the reply! However, now Python is throwing a 'IndexError: list index out of range' at runtime. Further inspection shows that workerX.result is an empty list. It seems that I am still missing something with the multiprocessing library.
swilly
Remove self.result = [] from __init__. If you get an AttributeError, then the problem is with calling subprocess. If you get IndexError, the problem is with populating the dataset. Try putting a print statement in run.
David Berger
A: 
swilly
Yes, using a queue (or pipes) incurs serialization costs; it will be slower, however it is the correct method of doing things.
jnoller
A: 

No need to send the number of chunks to each process, just use get_nowait() and handle the eventual Queue.Empty exception. Every process will get different amounts of CPU time and this should keep them all busy.

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()
petercable