views:

454

answers:

7

I run across a lot of "embarrassingly parallel" projects I'd like to parallelize with the multiprocessing module. However, they often involve reading in huge files (greater than 2gb), processing them line by line, running basic calculations, and then writing results. What's the best way to split a file and process it using Python's multiprocessing module? Should Queue or JoinableQueue in multiprocessing be used? Or the Queue module itself? Or, should I map the file iterable over a pool of processes using multiprocessing? I've experimented with these approaches but the overhead is immense in distribution the data line by line. I've settled on a lightweight pipe-filters design by using cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2, which passes a certain percentage of the first process's input directly to the second input (see this post), but I'd like to have a solution contained entirely in Python.

Surprisingly, the Python documentation doesn't suggest a canonical way of doing this (despite a lengthy section on programming guidelines in the multiprocessing documentation).

Thanks, Vince

Additional information: Processing time per line varies. Some problems are fast and barely not I/O bound, some are CPU-bound. The CPU bound, non-dependent tasks will gain the post from parallelization, such that even inefficient ways of assigning data to a processing function would still be beneficial in terms of wall clock time.

A prime example is a script that extracts fields from lines, checks for a variety of bitwise flags, and writes lines with certain flags to a new file in an entirely new format. This seems like an I/O bound problem, but when I ran it with my cheap concurrent version with pipes, it was about 20% faster. When I run it with pool and map, or queue in multiprocessing it is always over 100% slower.

+1  A: 

It depends a lot on the format of your file.

Does it make sense to split it anywhere? Or do you need to split it at a new line? Or do you need to make sure that you split it at the end of an object definition?

Instead of splitting the file, you should use multiple readers on the same file, using os.lseek to jump to the appropriate part of the file.

Update: Poster added that he wants to split on new lines. Then I propose the following:

Let's say you have 4 processes. Then the simple solution is to os.lseek to 0%, 25%, 50% and 75% of the file, and read bytes until you hit the first new line. That's your starting point for each process. You don't need to split the file to do this, just seek to the right location in the large file in each process and start reading from there.

Mark Byers
Split on newline.
Vince
I've updated the comment to explain how to use os.lseek in your case.
Mark Byers
+4  A: 

You dont mention how you are processing the lines; possibly the most important piece of info.

Is each line independant? Is the calculation dependant on one line coming before the next? Must they be processed in blocks? How long does the processing for each line take? Is there a processing step that must incorporate "all" the data at the end? Or can intermediate results be thrown away and just a running total maintained? Can the file be initially split by dividing filesize by count of threads? Or does it grow as you process it?

If the lines are independant and the file doesn't grow, the only coordination you need is to farm out "starting addresses" and "lengths" to each of the workers; they can independantly open and seek into the file and then you must simply coordinate their results; perhaps by waiting for N results to come back into a queue.

If the lines are not independant, the answer will depend highly on the structure of the file.

Joe Koberg
Sorry, each file is independent, nothing is dependent, nothing is shared (except for optional counters). A classic example is a function that takes a line, decides whether it wants to keep it or not, performs some minor calculations on kept lines, format these calculations, and then write these lines to a file for that process. All the files can then be concatenated together in a separate process. Regarding file seeking - seeking is done by byte count in Python, which could either introduce complexity in matching lines to bytes. Is it worth it?
Vince
PS: the file does not grow, intermediate results are appended to a file (one file per process to prevent I/O write conflicts). This truly is an embarrassingly parallel problem.
Vince
+1  A: 

I know you specifically asked about Python, but I will encourage you to look at Hadoop (http://hadoop.apache.org/): it implements the Map and Reduce algorithm which was specifically designed to address this kind of problem.

Good luck

Arrieta
You have no idea yet if it was designed for this problem. As others have pointed out, we don't know enough about the problem.
San Jacinto
@ San Jacinto... I seem to read "they often involve reading in huge files (greater than 2gb), processing them line by line, running basic calculations, and then writing results" that's good enough for me, as I am not giving a specific implementation detail, but a general observation. Chill, dude.
Arrieta
I have used hadoop and map/reduce before. I love both, and map/reduce can (and is somewhat) being applied here. Hadoop solves some I/O problems with their HFS (IIRC). I am asking about the step before map/reduce: which approach to take to divide a file to have a function mapped on it. A queue? A file iterable?
Vince
@Arrieta no problems here. Just stating an observation as well: you gave an answer to a question that wasn't asked :). If you have taken offense, please re-read it as "we have" instead of "you have." You will notice, I didn't downvote.
San Jacinto
A: 

If the run time is long, instead of having each process read its next line through a Queue, have the processes read batches of lines. This way the overhead is amortized over several lines (e.g. thousands or more).

orip
+5  A: 

One strategy is to assign each worker an offset so if you have eight worker processes you assign then numbers 0 to 7. Worker number 0 reads the first record processes it then skips 7 and goes on to process the 8th record etc., worker number 1 reads the second record then skips 7 and processes the 9th record.........

There are a number of advantages to this scheme. It doesnt matter how big the file is the work is always divided evenly, processes on the same machine will process at roughly the same rate, and use the same buffer areas so you dont incur any excessive I/O overhead. As long as the file hasnt been updated you can rerun individual threads to recover from failures.

James Anderson
+5  A: 

One of the best architectures is already part of Linux OS's. No special libraries required.

You want a "fan-out" design.

  1. A "main" program creates a number of subprocesses connected by pipes.

  2. The main program reads the file, writing lines to the pipes doing the minimum filtering required to deal the lines to appropriate subprocesses.

Each subprocess should probably be a pipeline of distinct processes that read and write from stdin.

You don't need a queue data structure, that's exactly what an in-memory pipeline is -- a queue of bytes between two concurrent processes.

S.Lott
I will look at implementing this approach in Python, as the multiprocessing module has pipes. As you see in the original post, I use this approach in the shell, with great success. I had naïvely thought I could never achieve data parallelism with pipes.
Vince
Simple shell pipes is the ideal form of parallelism. It's what Linux does best. It's often the perfect solution.
S.Lott
Here is the result: http://github.com/vsbuffalo and the results on a 32 CPU machine http://paste.pocoo.org/show/154252/.Thanks S.Lott!
Vince
@Vince: Please use the `enumerate` function instead of your own counter. It knocks two more lines of code out of the thing, leading to yet more simplification. The OS features -- for simple parallel processes -- are sometimes all you need.
S.Lott
+1  A: 

Fredrik Lundh's Some Notes on Tim Bray's Wide Finder Benchmark is an interesting read, about a very similar use case, with a lot of good advice. Various other authors also implemented the same thing, some are linked from the article, but you might want to try googling for "python wide finder" or something to find some more. (there was also a solution somewhere based on the multiprocessing module, but that doesn't seem to be available anymore)

Steven
It's too bad that the source to many of the submissions is hard to track down. There's lots of useful techniques to be learned from the entries to widefinder/widefinder2.
jmanning2k