views:

2616

answers:

4

Hi there,

I am a Python newbie so please be gentle :)

I have a Python script that I want to use as a controller to another Python script. I have a server with 64 processors, so want to spawn up to 64 child processes of this second Python script. The child script is called:

%> python create_graphs.py --name=NAME

where NAME is something like XYZ, ABC, NYU etc.

In my parent controller script I retrieve the name variable from a list:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

So my question is, what is the best way to spawn off these processes as children? I want to limit the number of children to 64 at a time, so need to track the status (if the child process has finished or not) so I can efficiently keep the whole generation running.

I looked into using the subprocess package, but rejected it because it only spawns one child at a time. I finally found the multiprocessor package, but I admit to being overwhelmed by the whole threads vs. subprocesses documentation.

Right now, my script uses subprocess.call to only spawn one child at a time and looks like this:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

I really want it to spawn up 64 children at a time. In other stackoverflow questions I saw people using Queue, but it seems like that creates a performance hit? All help appreciated. Thanks in advance.

A: 

I would definitely use multiprocessing rather than rolling my own solution using subprocess.

Aaron Maenpaa
A: 

I don't think you need queue unless you intend to get data out of the applications (Which if you do want data, I think it may be easier to add it to a database anyway)

but try this on for size:

put the contents of your create_graphs.py script all into a function called "create_graphs"

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

I know that this will result in 1 less threads than processors, which is probably good, it leaves a processor to manage the threads, disk i/o, and other things happening on the computer. If you decide you want to use the last core just add one to it

edit: I think I may have misinterpreted the purpose of my_list. You do not need my_list to keep track of the threads at all (as they're all referenced by the items in the threads list). But this is a fine way of feeding the processes input - or even better: use a generator function ;)

The purpose of my_list and threads

my_list holds the data that you need to process in your function
threads is just a list of the currently running threads

the while loop does two things, start new threads to process the data, and check if any threads are done running.

So as long as you have either (a) more data to process, or (b) threads that aren't finished running.... you want to program to continue running. Once both lists are empty they will evaluate to False and the while loop will exit

Jiaaro
Hi Jim. I have not seen this while syntax before. You have two lists in the while clause - threads and my_list. What is the logic behind this? Everywhere I have seen Python while loops they have a definitive less than or some other way to control the loop. That appears to be missing here - or am I just asking a dumb newbie question? Thanks.
tatlar
edited answer to answer this question
Jiaaro
Hi Jim - thanks for the clarification. In my code when I set up the which loops like this just makes my script return nothing (i.e the which loop never starts). If I change the while to a for loop it works:before:while threads and my_listnow:for list_item in my_listI wonder what I am doing wrong. I am going to post the full working script I have below - maybe you could take a look? Thanks so much for all the help.
tatlar
sure... just leave me another comment when you have added the script (so it will show up in my recent items list)
Jiaaro
added the script. Thanks Jim!
tatlar
I revisited this just to try and figure out why I never got it working. I figured it out.While statements proceed only when ALL conditions evaluate to True. Jim's while loop will never evaluate to True because the list 'threads' will evaluate to False at the start of the script as it is empty. This is why I could never get Jim's example to work - I could never get into the while loop successfully.
tatlar
@tatlar ahh sorry about that. It should have been an `or` not an `and` :(
Jiaaro
+9  A: 

What you are looking for is the process pool class in multiprocessing.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

And here is a calculation example to make it easier to understand. The following will divide 10000 tasks on N processes where N is the cpu count. Note that I'm passing None as the number of processes. This will cause the Pool class to use cpu_count for the number of processes (reference)

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results
Nadia Alramli
If each process in the pool is pulling a "request" from a common queue, then they'll keep themselves maximally busy until the queue's empty.
S.Lott
Thanks for the help. When I try and run your example code, I get the following error:ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.My server is a Sun Sparc (T5220, SunOS 5.10). How do I need to change this to run on my system?
tatlar
@tatlar, this is a known issue http://bugs.python.org/issue3770 unfortunately I'm not aware of a way to fix this.
Nadia Alramli
Okay thanks. With this limitation in mind, would it therefore be better to go with the solution presented by Jim Robert, ie. using threading.Thread() shown below?
tatlar
@tatlar, but how come your example in the question is importing multiprocessing and Queue without problems?
Nadia Alramli
Not sure. I just double checked and I don't get any errors on import. I guess the problem is that only certain methods fail on SunOS. From the docs:"Some of this package’s functionality requires a functioning shared semaphore implementation on the host operating system."[http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool]I guess Pool is one of those???
tatlar
hmm, I would really like to help you. My problem is that I don't know what will or will not work on SunOS. My advice is to read the multiprocessing docs and see if there are other methods that might work. It is definitely better than starting from scratch.
Nadia Alramli
A: 

Here is the solution I came up, based on Nadia and Jim's comments. I am not sure if it is the best way, but it works. The original child script being called needs to be a shell script because I need to use some 3rd party apps including Matlab. So I had to take it out of Python and code it in bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Does this seem like a reasonable solution? I tried to use Jim's while loop format, but my script just returned nothing. I am not sure why that would be. Here is the output when I run the script with Jim's 'while' loop replacing the 'for' loop:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

When I run it with the 'for' loop, I get something more meaningful:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

So this works, and I am happy. However, I still don't get why I can't use Jim's 'while' style loop instead of the 'for' loop I am using. Thanks for all the help - I am impressed with the breadth of knowledge @ stackoverflow.

tatlar
you need to get the values out of my_list using the pop method, which returns the value AND removes it from the list. if you don't remove the items from the list when you're done with them, the while loop will never end.
Jiaaro
there is also a problem with the for loop because if you have more data to process than cores, will will hit the limit and just ignore the rest of the data (it won't process it all)
Jiaaro
oops - okay, added the pop() method back in. however, I still see the same output that I posted above....
tatlar
if I add a print statement to the while loop (to make sure I am actually inside the loop) I don't see that output either. it is like the loop never gets entered?
tatlar
I revisited this just to try and figure out why I never got it working. I figured it out. While statements proceed only when ALL conditions evaluate to True. Jim's while loop will never evaluate to True because the list 'threads' will evaluate to False at the start of the script as it is empty. This is why I could never get Jim's example to work - I could never get into the while loop successfully.
tatlar