views:

287

answers:

1

I'm having troubles with the multiprocessing module. I'm using a Pool of workers with its map method to load data from lots of files and for each of them i analyze data with with a custom fuction. Each time a file has been processed I would like to have a counter updated so that i can keep track of how many files remains to be processed. Here is sample code:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter


if __name__ == '__main__':

    list_of_files = os.listdir(some_directory)

    global counter
    counter = 0

    p = Pool()
    p.map(analyze_data, list_of_files)

I can't find a solution for this.

+7  A: 

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()
jkp
Great answer! I had the same problem in IronPython, and while multiprocessing.Value is not available you can do something similar with clr.Reference and System.Threading.Interlocked: http://stackoverflow.com/questions/2255461/how-to-atomically-increment-a-static-member-in-ironpython/2314858#2314858
Greg Bray