views:

135

answers:

5

Am new to python and making some headway with threading - am doing some music file conversion and want to be able to utilize the multiple cores on my machine (one active conversion thread per core).

class EncodeThread(threading.Thread):
    # this is hacked together a bit, but should give you an idea
    def run(self):
        decode = subprocess.Popen(["flac","--decode","--stdout",self.src],
                            stdout=subprocess.PIPE)
        encode = subprocess.Popen(["lame","--quiet","-",self.dest],
                                stdin=decode.stdout)
        encode.communicate()

# some other code puts these threads with various src/dest pairs in a list

for proc in threads: # `threads` is my list of `threading.Thread` objects
    proc.start()

Everything works, all the files get encoded, bravo! ... however, all the processes spawn immediately, yet I only want to run two at a time (one for each core). As soon as one is finished, I want it to move on to the next on the list until it is finished, then continue with the program.

How do I do this?

(I've looked at the thread pool and queue functions but I can't find a simple answer.)

Edit: maybe I should add that each of my threads is using subprocess.Popen to run a separate command line decoder (flac) piped to stdout which is fed into a command line encoder (lame/mp3).

+1  A: 

If you're using the default "cpython" version then this won't help you, because only one thread can execute at a time; look up Global Interpreter Lock. Instead, I'd suggest looking at the multiprocessing module in Python 2.6 -- it makes parallel programming a cinch. You can create a Pool object with 2*num_threads processes, and give it a bunch of tasks to do. It will execute up to 2*num_threads tasks at a time, until all are done.

At work I have recently migrated a bunch of Python XML tools (a differ, xpath grepper, and bulk xslt transformer) to use this, and have had very nice results with two processes per processor.

Edmund
I am going to check out multiprocessing ...
thornomad
If your subprocesses will be executing functions in your Python code then the multiprocessing module is great. If you're calling an external program then this module wouldn't offer an advantage over the subprocess module ... because those external programs won't have any means of returning their results back to the parent other than temporary files, or pipes etc. The huge IPC advantages of the multiprocessing module are lost on the external programs that you'd exec. (Having each process in a multiprocess call subprocess sounds pretty silly, for example).
Jim Dennis
A: 

I am not an expert in this, but I have read something about "Lock"s. This article might help you out

Hope this helps

inspectorG4dget
+1  A: 

It looks to me that what you want is a pool of some sort, and in that pool you would like the have n threads where n == the number of processors on your system. You would then have another thread whose only job was to feed jobs into a queue which the worker threads could pick up and process as they became free (so for a dual code machine, you'd have three threads but the main thread would be doing very little).

As you are new to Python though I'll assume you don't know about the GIL and it's side-effects with regard to threading. If you read the article I linked you will soon understand why traditional multithreading solutions are not always the best in the Python world. Instead you should consider using the multiprocessing module (new in Python 2.6, in 2.5 you can use this backport) to achieve the same effect. It side-steps the issue of the GIL by using multiple processes as if they were threads within the same application. There are some restrictions about how you share data (you are working in different memory spaces) but actually this is no bad thing: they just encourage good practice such as minimising the contact points between threads (or processes in this case).

In your case you are probably intersted in using a pool as specified here.

jkp
Thanks - I will look at multiprocess ... and I edited my question to have a bit more detail ... it seems that subprocess.Popen does sort of break off and do its own thing.
thornomad
The multiprocessing module, BTW, is a wonderful addition to 2.6 (from the pyprocessing 3rd party module that support 2.4 and 2.5).It is, however, not as well suited to running external programs. The main advantages to the multiprocessing module lie in the ways that it's modeled after the threading support. You can create Queue()s as the primary inter-(thread/process) communications mechanism to eliminate most of the need for your own explicit locking. (Queue()s provide coherent support for multiple producers and consumers of arbitrary objects). Great if the children run Python code.
Jim Dennis
+4  A: 

"Each of my threads is using subprocess.Popen to run a separate command line [process]".

Why have a bunch of threads manage a bunch of processes? That's exactly what an OS does that for you. Why micro-manage what the OS already manages?

Rather than fool around with threads overseeing processes, just fork off processes. Your process table probably can't handle 2000 processes, but it can handle a few dozen (maybe a few hundred) pretty easily.

You want to have more work than your CPU's can possibly handle queued up. The real question is one of memory -- not processes or threads. If the sum of all the active data for all the processes exceeds physical memory, then data has to be swapped, and that will slow you down.

If your processes have a fairly small memory footprint, you can have lots and lots running. If your processes have a large memory footprint, you can't have very many running.

S.Lott
Heh. I see now the tomfoolerly of my hacked together approach - it is a bit redundant. So, is there a way with subprocess to manage a "pool" (as others have suggested). Thanks for your input. Learning as I go ... would it just be a matter of using `subprocess.poll()` to see what's done and what's still running? Thanks again.
thornomad
Correct. You could use a simple set of processes; remove ones that are finished. Add ones and keep the size of the set under some limit. It's just a set with `add` and `remove`.
S.Lott
+1  A: 

Short answer: don't use threads.

For a working example, you can look at something I've recently tossed together at work. It's a little wrapper around ssh which runs a configurable number of Popen() subprocesses. I've posted it at: Bitbucket: classh (Cluster Admin's ssh Wrapper).

As noted, I don't use threads; I just spawn off the children, loop over them calling their .poll() methods and checking for timeouts (also configurable) and replenish the pool as I gather the results. I've played with different sleep() values and in the past I've written a version (before the subprocess module was added to Python) which used the signal module (SIGCHLD and SIGALRM) and the os.fork() and os.execve() functions --- which my on pipe and file descriptor plumbing, etc).

In my case I'm incrementally printing results as I gather them ... and remembering all of them to summarize at the end (when all the jobs have completed or been killed for exceeding the timeout).

I ran that, as posted, on a list of 25,000 internal hosts (many of which are down, retired, located internationally, not accessible to my test account etc). It completed the job in just over two hours and had no issues. (There were about 60 of them that were timeouts due to systems in degenerate/thrashing states -- proving that my timeout handling works correctly).

So I know this model works reliably. Running 100 current ssh processes with this code doesn't seem to cause any noticeable impact. (It's a moderately old FreeBSD box). I used to run the old (pre-subprocess) version with 100 concurrent processes on my old 512MB laptop without problems, too).

(BTW: I plan to clean this up and add features to it; feel free to contribute or to clone off your own branch of it; that's what Bitbucket.org is for).

Jim Dennis
Thanks - I will look at that a little more closely today. I came up quickly with a pretty simple set of while loops that seems to be working just checking the `p.communicate()` method. (PS: I think you are missing a closing `'''` on line 4 in the source.)
thornomad