tags:

views:

949

answers:

2

I'm trying to run a process and do stuff with its input, output and error streams. The obvious way to do this is to use something like select(), but the only thing I can find in Java that does that is Selector.select(), which takes Channels. It doesn't appear to be possible to get a Channel from an InputStream or OutputStream (FileStream has a .getChannel() method but that doesn't help here)

So, instead I wrote some code to poll all the streams:

while( !out_eof || !err_eof )
{
    while( out_str.available() )
    {
        if( (bytes = out_str.read(buf)) != -1 )
        {
            // Do something with output stream
        }
        else
            out_eof = true;
    }
    while( err_str.available() )
    {
        if( (bytes = err_str.read(buf)) != -1 )
        {
            // Do something with error stream
        }
        else
            err_eof = true;
    }
    sleep(100);
}

which works, except that it never terminates. When one of the streams reaches end of file, available() returns zero so read() isn't called and we never get the -1 return that would indicate EOF.

One solution would be a non-blocking way to detect EOF. I can't see one in the docs anywhere. Alternatively is there a better way of doing what I want to do?

I see this question here: link text and although it doesn't exactly do what I want, I can probably use that idea, of spawning separate threads for each stream, for the particular problem I have now. But surely that isn't the only way to do it? Surely there must be a way to read from multiple streams without using a thread for each?

+3  A: 

As you said, the solution outlined in this Answer is the traditional way of reading both stdout and stderr from a Process. A thread-per-stream is the way to go, even though it is slightly annoying.

Matt Quail
+2  A: 

You will indeed have to go the route of spawning a Thread for each stream you want to monitor. If your use case allows for combining both stdout and stderr of the process in question you need only one thread, otherwise two are needed.

It took me quite some time to get it right in one of our projects where I have to launch an external process, take its output and do something with it while at the same time looking for errors and process termination and also being able to terminate it when the java app's user cancels the operation.

I created a rather simple class to encapsulate the watching part whose run() method looks something like this:

public void run() {
    BufferedReader tStreamReader = null;
    try {
        while (externalCommand == null && !shouldHalt) {
            logger.warning("ExtProcMonitor("
                           + (watchStdErr ? "err" : "out")
                           + ") Sleeping until external command is found");
            Thread.sleep(500);
        }
        if (externalCommand == null) {
            return;
        }
        tStreamReader =
                new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
                        : externalCommand.getInputStream()));
        String tLine;
        while ((tLine = tStreamReader.readLine()) != null) {
            logger.severe(tLine);
            if (filter != null) {
                if (filter.matches(tLine)) {
                    informFilterListeners(tLine);
                    return;
                }
            }
        }
    } catch (IOException e) {
        logger.logExceptionMessage(e, "IOException stderr");
    } catch (InterruptedException e) {
        logger.logExceptionMessage(e, "InterruptedException waiting for external process");
    } finally {
        if (tStreamReader != null) {
            try {
                tStreamReader.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }
}

On the calling side it looks like this:

    Thread tExtMonitorThread = new Thread(new Runnable() {

        public void run() {
            try {
                while (externalCommand == null) {
                    getLogger().warning("Monitor: Sleeping until external command is found");
                    Thread.sleep(500);
                    if (isStopRequested()) {
                        getLogger()
                                .warning("Terminating external process on user request");
                        if (externalCommand != null) {
                            externalCommand.destroy();
                        }
                        return;
                    }
                }
                int tReturnCode = externalCommand.waitFor();
                getLogger().warning("External command exited with code " + tReturnCode);
            } catch (InterruptedException e) {
                getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
            }
        }
    }, "ExtCommandWaiter");

    ExternalProcessOutputHandlerThread tExtErrThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
    ExternalProcessOutputHandlerThread tExtOutThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
    tExtMonitorThread.start();
    tExtOutThread.start();
    tExtErrThread.start();
    tExtErrThread.setFilter(new FilterFunctor() {

        public boolean matches(Object o) {
            String tLine = (String)o;
            return tLine.indexOf("Error") > -1;
        }
    });

    FilterListener tListener = new FilterListener() {
        private boolean abortFlag = false;

        public boolean shouldAbort() {
            return abortFlag;
        }

        public void matched(String aLine) {
            abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
        }

    };

    tExtErrThread.addFilterListener(tListener);
    externalCommand = new ProcessBuilder(aCommand).start();
    tExtErrThread.setProcess(externalCommand);
    try {
        tExtMonitorThread.join();
        tExtErrThread.join();
        tExtOutThread.join();
    } catch (InterruptedException e) {
        // when this happens try to bring the external process down 
        getLogger().severe("Aborted because auf InterruptedException.");
        getLogger().severe("Killing external command...");
        externalCommand.destroy();
        getLogger().severe("External command killed.");
        externalCommand = null;
        return -42;
    }
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();

    externalCommand = null;
    try {
        getLogger().warning("command exit code: " + tRetVal);
    } catch (IllegalThreadStateException ex) {
        getLogger().warning("command exit code: unknown");
    }
    return tRetVal;

Unfortunately I don't have to for a self-contained runnable example, but maybe this helps. If I had to do it again I would have another look at using the Thread.interrupt() method instead of a self-made stop flag (mind to declare it volatile!), but I leave that for another time. :)

Daniel Schneller