views:

339

answers:

4

First of all, the overall problem I am solving is a bit more complicated than I am showing here, so please do not tell me 'use threads with blocking' as it would not solve my actual situation without a fair, FAIR bit of rewriting and refactoring.

I have several applications which are not mine to modify, which take data from stdin and poop it out on stdout after doing their magic. My task is to chain several of these programs. Problem is, sometimes they choke, and as such I need to track their progress which is outputted on STDERR.

pA = subprocess.Popen(CommandA,  shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ... some more processes make up the chain, but that is irrelevant to the problem
pB = subprocess.Popen(CommandB, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pA.stdout )

Now, reading directly through pA.stdout.readline() and pB.stdout.readline(), or the plain read() functions, is a blocking matter. Since different applications output in different paces and different formats, blocking is not an option. (And as I wrote above, threading is not an option unless at a last, last resort.) pA.communicate() is deadlock safe, but since I need the information live, that is not an option either.

Thus google brought me to this asynchronous subprocess snippet on ActiveState.

All good at first, until I implement it. Comparing the cmd.exe output of pA.exe | pB.exe, ignoring the fact both output to the same window making for a mess, I see very instantaneous updates. However, I implement the same thing using the above snippet and the read_some() function declared there, and it takes over 10 seconds to notify updates of a single pipe. But when it does, it has updates leading all the way upto 40% progress, for example.

Thus I do some more research, and see numerous subjects concerning PeekNamedPipe, anonymous handles, and returning 0 bytes available even though there is information available in the pipe. As the subject has proven quite a bit beyond my expertise to fix or code around, I come to Stack Overflow to look for guidance. :)

My platform is W7 64-bit with Python 2.6, the applications are 32-bit in case it matters, and compatibility with Unix is not a concern. I can even deal with a full ctypes or pywin32 solution that subverts subprocess entirely if it is the only solution, as long as I can read from every stderr pipe asynchronously with immediate performance and no deadlocks. :)

A: 

What about using Twisted's FD's? http://twistedmatrix.com/documents/8.1.0/api/twisted.internet.fdesc.html

It's not asynchronous but it is non-blocking. For asynchronous stuff, can you port to using Twisted?

Gregory
I did look at Twisted before, but it is pretty much its own world, and due to all the internet related stuff, I had issues finding things that were relevant to me. Thanks for that link, I'll see what I can do with it. I am kind of afraid, however, that it may end up suffering from the same PeekNamedPipe issue I wrote about in my question, since everything on Windows and non-blocking pipes seems to lead to that somehow.
Stigma
A: 

Can't you just do a non-blocking read from pA.stdout and pB.stdout? You will have a tight busy loop to manage?

http://stackoverflow.com/questions/375427/non-blocking-read-on-a-stream-in-python

goldsz
Python documentation says that using a pX.stdYYY.read() or .readline() loop can cause a deadlock if done while the process is running. Just the chance of it happening doesn't make me happy at all. :)The subprocdev thing in that thread seems to be the only thing supporting Windows (the rest seems devved for unix), but I already saw that. It has been offered up for inclusion into Python 2.7 and 3.2, but that is far from certain from what I can read - nor is there anything I can download that I can see.
Stigma
+1  A: 

How bad is it to have to use threads? I encountered much the same problem and eventually decided to use threads to gather up all the data on a sub-process's stdout and stderr and put it onto a thread-safe queue which which the main thread can read in a blocking fashion, without having to worry about the threading going on behind the scenes.

It's not clear what trouble you anticipate with a solution based on threads and blocking. Are you worried about having to make the rest of your code thread-safe? That shouldn't be an issue since the IO thread wouldn't need to interact with any of the rest of your code or data. If you have very restrictive memory requirements or your pipeline is particularly long then perhaps you may feel unhappy about spawning so many threads. I don't know enough about your situation so I couldn't say if this is likely to be a problem, but it seems to me that since you're already spawning off extra processes a few threads to interact with them should not be a terrible burden. In my situation I have not found these IO threads to be particularly problematic.

My thread function looked something like this:

def simple_io_thread(pipe, queue, tag, stop_event):
    """
    Read line-by-line from pipe, writing (tag, line) to the
    queue. Also checks for a stop_event to give up before
    the end of the stream.
    """
    while True:
        line = pipe.readline()

        while True:
            try:
                # Post to the queue with a large timeout in case the
                # queue is full.
                queue.put((tag, line), block=True, timeout=60)
                break
            except Queue.Full:
                if stop_event.isSet():
                    break
                continue
        if stop_event.isSet() or line=="":
            break
    pipe.close()

When I start up the subprocess I do this:

outputqueue = Queue.Queue(50)
stop_event = threading.Event()
process = subprocess.Popen(
    command,
    cwd=workingdir,
    env=env,
    shell=useshell,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
stderr_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stderr, outputqueue, "STDERR", stop_event)
)
stdout_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stdout, outputqueue, "STDOUT", stop_event)
)
stderr_thread.daemon = True
stdout_thread.daemon = True
stderr_thread.start()
stdout_thread.start()

Then when I want to read I can just block on outputqueue - each item read from it contains either a string to identify which pipe it came from and a line of text from that pipe. Very little code runs in a separate thread, and it only communicates with the main thread via a thread-safe queue (plus an event in case I need to give up early). Perhaps this approach would be useful and allow you to solve the problem with threads and blocking but without having to rewrite lots of code?

(My solution is made more complicated because I sometimes wish to terminate the subprocesses early, and want to be sure that the threads will all finish. If that's not an issue you can get rid of all the stop_event stuff and it becomes pretty succinct.)

Weeble
I would love to improve my answer if somebody would tell me why they're downvoting it. The question said that to "use threads with blocking" would involve a lot of rewriting, and I am suggesting that it might not be the case that using threads would require a lot of rewriting.
Weeble
Apologies for the late reply (I was mistakenly under the impression my question had stopped to get attention.)I'll give this a shot when time allows. I had dropped the issue in favor for more productive matters recently, but I definitely want to solve it still.
Stigma
+1  A: 

I assume that the process pipeline will not deadlock if it only uses stdin and stdout; and the problem you're trying to solve is how to make it not deadlock if they write to stderr (and have to deal with stderr possibly getting backed up).

If you're letting multiple processes write to stderr, you have to watch out for their output being intermingled. I'm guessing you have that sorted somehow; just putting it out there to be sure.

Be aware of the -u flag to python; it is helpful when testing to see if OS buffering is screwing you up.

If you want to emulate select() on file handles in win32, your only choice is to use PeekNamedPipe() and friends. I have a snippet of code that reads line-oriented output from multiple processes at once, which you may even be able to use directly -- try passing the list of proc.stderr handles to it and go.

class NoLineError(Exception): pass
class NoMoreLineError(Exception): pass
class LineReader(object):
    """Helper class for multi_readlines."""
    def __init__(self, f):
        self.fd = f.fileno()
        self.osf = msvcrt.get_osfhandle(self.fd)
        self.buf = ''

    def getline(self):
        """Returns a line of text, or raises NoLineError, or NoMoreLineError."""
        try:
            _, avail, _ = win32pipe.PeekNamedPipe(self.osf, 0)
            bClosed = False
        except pywintypes.error:
            avail = 0
            bClosed = True

        if avail:
            self.buf += os.read(self.fd, avail)

        idx = self.buf.find('\n')
        if idx >= 0:
            ret, self.buf = self.buf[:idx+1], self.buf[idx+1:]
            return ret
        elif bClosed:
            if self.buf:
                ret, self.buf = self.buf, None
                return ret
            else:
                raise NoMoreLineError
        else:
            raise NoLineError


def multi_readlines(fs, timeout=0):
    """Read lines from |fs|, a list of file objects.
    The lines come out in arbitrary order, depending on which files
    have output available first."""
    if type(fs) not in (list, tuple):
        raise Exception("argument must be a list.")
    objs = [LineReader(f) for f in fs]
    for i,obj in enumerate(objs): obj._index = i
    while objs:
        yielded = 0
        for i,obj in enumerate(objs):
            try:
                yield (obj._index, obj.getline())
                yielded += 1
            except NoLineError:
                #time.sleep(timeout)
                pass
            except NoMoreLineError:
                del objs[i]
                break   # Because we mutated the array

        if not yielded:
            time.sleep(timeout)
            pass

I have never seen the "Peek returns 0 bytes even though data is available" issue myself. If this happens to others, I bet their libc is buffering their stdout/stderr before sending the data to the OS; there is nothing you can do about that from outside. You have to make the app use unbuffered output somehow (-u to python; win32/libc calls to modify the stderr file handle, ...)

The fact that you are seeing nothing, then a ton of updates, makes me think that your problem is buffering on the source end. win32 libc may buffer differently if it writes to a pipe rather than a console. Again, the best you can do from outside those programs is to aggressively drain their output.

Paul Du Bois
Apologies for the late reply.I'll play with your code, but a quick overview shows little different internally from the snippets I already posted. Maybe the -u option will work, but that only applies to Python's STDOUT and STDERR, if I understand correctly. In my case, it is the spawned processes with subprocess.PIPE configured STDERR and STDOUT which are delaying and outputting in spurts. Please do correct me if I misunderstand this, though.Either way, I'll give it a shot non-the-less when time allows me to again. :)
Stigma
Yes, sorry; I should have made my point more clear that I don't think it's your general approach that is incorrect (since our code is substantially identical), and that IMO it is likely that the problem is in the surrounding pieces.
Paul Du Bois