views:

98

answers:

2

I want to

  1. call shell commands (for example 'sleep' below) in parallel,
  2. report on their individual starts and completions and
  3. be able to kill them with 'kill -9 parent_process_pid'.

There is already a lot written on these kinds of things already but I feel like I haven't quite found the elegant pythonic solution I'm looking for. I'm also trying to keep things relatively readable (and short) for someone completely unfamiliar with python.

My approach so far (see code below) has been:

  1. put subprocess.call(unix_command) in a wrapper function that reports the start and completion of the command.
  2. call the wrapper function with multiprocess.Process.
  3. track the appropriate pids, store them globally, and kill them in the signal_handler.

I was trying to avoid a solution that periodically polled the processes but I'm not sure why.

Is there a better approach?

import subprocess,multiprocessing,signal
import sys,os,time

def sigterm_handler(signal, frame):
        print 'You killed me!'
        for p in pids:
                os.kill(p,9)
        sys.exit(0)

def sigint_handler(signal, frame):
        print 'You pressed Ctrl+C!'
        sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigterm_handler)

def f_wrapper(d):
        print str(d) + " start"
        p=subprocess.call(["sleep","100"])
        pids.append(p.pid)
        print str(d) + " done"

print "Starting to run things."

pids=[]

for i in range(5):
        p=multiprocessing.Process(target=f_wrapper,args=(i,))
        p.daemon=True
        p.start()

print "Got things running ..."

while pids:
        print "Still working ..."
        time.sleep(1)
+1  A: 

Once subprocess.call returns, the sub-process is done -- and call's return value is the sub-process's returncode. So, accumulating those return codes in list pids (which btw is not synced between the multi-process appending it, and the "main" process) and sending them 9 signals "as if" they were process ids instead of return codes, is definitely wrong.

Another thing with the question that's definitely wrong is the spec:

be able to kill them with 'kill -9 parent_process_pid'.

since the -9 means the parent process can't possibly intercept the signal (that's the purpose of explicitly specifying -9) -- I imagine the -9 is therefore spurious here.

You should be using threading instead of multiprocessing (each "babysitter" thread, or process, does essentially nothing but wait for its sub-process, so why waste processes on such a lightweight task?-); you should also call suprocess.Process in the main thread (to get the sub-process started and be able to obtain its .pid to put in the list) and pass the resulting process object to the babysitter thread which waits for it (and when it's done reports and removes it from the list). The list of subprocess ids should be guarded by a lock, since the main thread and several babysitter threads can all access it, and a set would probably be a better choice than a list (faster removals) since you don't care about ordering nor about avoiding duplicates.

So, roughly (no testing, so there might be bugs;-) I'd change your code to s/thing like:

import subprocess, threading, signal
import sys, time

pobs = set()
pobslock = threading.Lock()
def numpobs():
    with pobslock:
        return len(pobs)

def sigterm_handler(signal, frame):
    print 'You killed me!'
    with pobslock:
        for p in pobs: p.kill()
    sys.exit(0)

def sigint_handler(signal, frame):
    print 'You pressed Ctrl+C!'
    sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigterm_handler)

def f_wrapper(d, p):
    print d, 'start', p.pid
    rc = p.wait()
    with pobslock:
        pobs.remove(p)
    print d, 'done, rc =', rc

print "Starting to run things."

for i in range(5):
    p = subprocess.Popen(['sleep', '100'])
    with pobslock:
        pobs.add(p)
    t = threading.Thread(target=f_wrapper, args=(i, p))
    t.daemon=True
    t.start()

print "Got things running ..."

while numpobs():
    print "Still working ..."
    time.sleep(1)
Alex Martelli
Thanks, that's very helpful. I'll post back once I clean this up!
mathtick
@mathtick, you're welcome! I've edited my A to show how, roughly, I'd see the code best structured.
Alex Martelli
This, http://speculation.org/garrick/kill-9.html, was a good lesson to me. I think I picked up the "-9" habit when I first touched a linux box.
mathtick
I've included a slightly modified version of the code above in a separate answer.
mathtick
@Alex: why is it that `sigterm_handler` must explicitly kill all the daemon threads while `sigint_handler` does not? The explicit `p.kill` s do indeed seem to be needed despite `t.daemon=True`. I'm also puzzled that the behavior of the program (after a SIGTERM or SIGINT) does not seem to change even if `t.daemon=True` is commented out. What is the purpose of `t.daemon=True`?
unutbu
@unutbu, depends on your platform's config and how those subprocesses are behaving (if they daemonize, that will have the _inverse_ effect from the `daemon=True` in Python, i.e., make them _independent_ of the parent process and in all likelihood produce different PIDs that you can't easily trace too;-). I'm as baffled as you are by the int/term difference -- I just copied it verbatim from the OP's original source (assuming he had his reasons for a control-C to behave differently in terms of subprocess termination from an explicit `kill` shell command).
Alex Martelli
@Alex: Thanks very much for the information.
unutbu
A: 

This code (code below) seems to work for me, killing from "top" or ctrl-c from the command line. The only real change from Alex's suggestions was to replace subprocess.Process with a subprocess.Popen call (I don't think subprocess.Process exists).

The code here could also be improved by somehow locking stdout so that there is no chance of printing overlap between processes.

import subprocess, threading, signal
import sys, time

pobs = set()                            # set to hold the active-process objects
pobslock = threading.Lock()     # a Lock object to make sure only one at a time can modify pobs

def numpobs():
        with pobslock:
                return len(pobs)

# signal handlers
def sigterm_handler(signal, frame):
        print 'You killed me! I will take care of the children.'
        with pobslock:
                for p in pobs: p.kill()
        sys.exit(0)

def sigint_handler(signal, frame):
        print 'You pressed Ctrl+C! The children will be dealt with automatically.'
        sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigterm_handler)


# a function to watch processes
def p_watch(d, p):
        print d, 'start', p.pid
        rc = p.wait()
        with pobslock:
                pobs.remove(p)
        print d, 'done, rc =', rc


# the main code
print "Starting to run things ..."
for i in range(5):
        p = subprocess.Popen(['sleep', '4'])
        with pobslock:
                pobs.add(p)
        # create and start a "daemon" to watch and report the process p.
        t = threading.Thread(target=p_watch, args=(i, p))
        t.daemon=True
        t.start()

print "Got things running ..."
while numpobs():
        print "Still working ..."
        time.sleep(1)
mathtick
@Mathtick, you're right that there's no subprocess.Process -- sorry for the thinko;-, editing my A now to fix it.
Alex Martelli