views:

80

answers:

1

I have a list of starting data. I want to apply a function to the starting data that creates a few pieces of new data for each element in the starting data. Some pieces of the new data are the same and I want to remove them.

The sequential version is essentially:

def create_new_data_for(datum):
    """make a list of new data from some old datum"""
    return [datum.modified_copy(k) for k in datum.k_list]

data = [some list of data] #some data to start with

#generate a list of new data from the old data, we'll reduce it next
newdata = []
for d in data:
    newdata.extend(create_new_data_for(d))

#now reduce the data under ".matches(other)"
reduced = []
for d in newdata:
    for seen in reduced:
        if d.matches(seen):
            break
    #so we haven't seen anything like d yet
    seen.append(d)

#now reduced is finished and is what we want!

I want to speed this up with multiprocessing.

I was thinking that I could use a multiprocessing.Queue for the generation. Each process would just put the stuff it creates on, and when the processes are reducing the data, they can just get the data from the Queue.

But I'm not sure how to have the different process loop over reduced and modify it without any race conditions or other issues.

What is the best way to do this safely? or is there a different way to accomplish this goal better?

+1  A: 

I would use a multiprocessing Lock (similar to a threading lock) which is provided in the std lib.

Here's an example from the standard documentation.

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

To avoid race conditions, be sure to call "mylock.acquire()" before doing any modification, and "mylock.release()" when you're done.

Jared Forsyth