views:

136

answers:

1

I have a numpy.array of 640x480 images, each of which is 630 images long. The total array is thus 630x480x640. I want to generate an average image, as well as compute the standard deviation for each pixel across all 630 images.

This is easily accomplished by

avg_image = numpy.mean(img_array, axis=0)
std_image = numpy.std(img_array, axis=0)

However, since I'm running this for 50 or so such arrays, and have a 8 core/16 thread workstation, I figured I'd get greedy and parallelize things with multiprocessing.Pool.

So I did the following:

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

However, I saw only a small speedup. By putting print statements in chunk_avg_map I was able to determine that only one or two processes are being launched at a time, rather than 16 (as I would expect).

I then ran my code through cProfile in iPython:

%prun current_image_anal.main()

The result indicated that by far the most time was spent in calls to acquire:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1527  309.755    0.203  309.755    0.203 {built-in method acquire}

Which I understand to be something to do with locking, but I don't understand why my code would be doing that. Does anyone have any ideas?

[EDIT] As requested, here is a run-able script which demonstrates the problem. You can profile it by whatever means you like, but when I did I found that the lions share of the time was taken up with calls to acquire, rather than mean or std as I would have expected.

#!/usr/bin/python
import numpy
import multiprocessing

def main():
    fake_images = numpy.random.randint(0,2**14,(630,480,640))
    chunk_avg(fake_images)

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    main()
+3  A: 

I believe the problem is that the amount of CPU time it takes to process each chunk is small relative to the amount of time it takes to copy the input and output to and from the worker processes. I modified your example code to split the output into 16 even chunks and to print out the difference in CPU time (time.clock()) between when a run of chunk_avg_map() begins and ends. On my system each individual run took slightly under a second of CPU time, but the overall CPU time usage for the process group (system + user time) was more than 38 seconds. An apparent 0.75 second copying overhead per chunk leaves your program performing calculations only slightly faster than multiprocessing can deliver the data, leading to only two worker processes ever being utilize at once.

If I modify the code such that the "input data" is just xrange(16) and build the random array within chunk_avg_map() then I see the sysem + user time drop to around 19 seconds and all 16 worker processes executing at the same time.

llasram