views:

117

answers:

2

Hello there, fellow members. I goal to create (or use existing) an InputStream implementation (say, MergeInputStream), that will try to read from a multiple InputStreams and return the first result. After that it will release block and stop reading from all InputStreams until next mergeInputStream.read() call. I was quite surprised, that I didn't found any such tool. The thing is: all of the source InputStreams are not quite finite (not a file, for example, but a System.in, socket or such), so I cannot use SequenceInputReader. I understand, that this will probably require some multi-thread mechanism, but I have absolutely no idea how to do it. I tried to google with no result.

+2  A: 

The problem of reading input from multiple sources and serializing them into one stream is preferably solved using SelectableChannel and Selector. This however requires that all sources are able to provide a selectable channel. This may or may not be the case.

If selectable channels are not available, you could choose to solve it with a single thread by letting the read-implementation do the following: For each input stream is, check if is.available() > 0, and if so return is.read(). Repeat this procedure until some input stream has data available.

This method however, has two major draw-backs:

  1. Not all implementations of InputStream implements available() in a way such that it returns 0 if and only if read() will block. The result is, naturally, that data may not be read from this stream, even though is.read() would return a value. Whether or not this is to be considered as a bug is questionable, as the documentation merely states that it should return an "estimate" of the number of bytes available.

  2. It uses a so called "busy-loop", which basically means that you'll either need to put a sleep in the loop (which results in a reading latency) or hog the CPU unnecessarily.

Your third option is to deal with the blocking reads by spawning one thread for each input stream. This however will require careful synchronization and possibly some overhead if you have a very high number of input streams to read from. The code below is a first attempt to solve it. I'm by no means certain that it is sufficiently synchronized, or that it manages the threads in the best possible way.

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MergedInputStream extends InputStream {

    AtomicInteger openStreamCount;
    BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
    InputStream[] sources;

    public MergedInputStream(InputStream... sources) {
        this.sources = sources;
        openStreamCount = new AtomicInteger(sources.length);
        for (int i = 0; i < sources.length; i++)
            new ReadThread(i).start();
    }


    public void close() throws IOException {
        String ex = "";
        for (InputStream is : sources) {
            try {
                is.close();
            } catch (IOException e) {
                ex += e.getMessage() + " ";
            }
        }
        if (ex.length() > 0)
            throw new IOException(ex.substring(0, ex.length() - 1));
    }


    public int read() throws IOException {
        if (openStreamCount.get() == 0)
            return -1;

        try {
            return buf.take();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }


    private class ReadThread extends Thread {

        private final int src;
        public ReadThread(int src) {
            this.src = src;
        }

        public void run() {
            try {
                int data;
                while ((data = sources[src].read()) != -1)
                    buf.put(data);
            } catch (IOException ioex) {
            } catch (InterruptedException e) {
            }
            openStreamCount.decrementAndGet();
        }
    }
}
aioobe
The comments in the forums you are linking to are by no means an authoritative reference.
Grodriguez
A: 

I can think of three ways to do this:

  • Use non-blocking I/O (API documentation). This is the cleanest solution.
  • Multiple threads, one for each merged input stream. The threads would block on the read() method of the associated input stream, then notify the MergeInputStream object when data becomes available. The read() method in MergedInputStream would wait for this notification, then read data from the corresponding stream.
  • Single thread with a busy loop. Your MergeInputStream.read() methods would need to loop checking the available() method of every merged input stream. If no data is available, sleep a few ms. Repeat until data becomes available in one of the merged input streams.
Grodriguez
A multithreaded solution would result in a lot of headaches due to synchronization (at least for me). `available` is not ever guaranteed to return a non-zero value. Thus, using SelectableChannels is the only sane alternative imho.
aioobe
`available()` is not guaranteed to return the total number of bytes that can be read without blocking, but should return non-zero if there is any data available (i.e. if a call to `read()` will not block). Many `InputStream` implementations will return `1` if there is data available, `0` otherwise.
Grodriguez
The one-thread scheme works, and works fine. I will study NIO and will try to implement it in future tasks. Thanks you both.
Frozen Spider
@Frozen Spider... sure, it will work fine in some cases. In other cases it will fail miserably. See for instance [SSLSocket.available() == 0 always](http://forums.sun.com/thread.jspa?threadID=5327431). What I'm saying is that there are no guarantees what so ever, and whenever your programs I/O operations seem flaky, you'll have to suspect and investigate if the error lies in this code.
aioobe
So, I have no choise but rewrite the whole module... Thank you for the link.
Frozen Spider
@aioobe: `SSLSocket.available()` does NOT return 0 always (and if it did, it would be a bug, such as this one for mid profile: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4726871). `SSLSocket.available()` returns the number of bytes successfully decrypted from the underlying stream but not yet read by the application, which **is** an estimate of the number of bytes that can be read without blocking. It still holds that when `available()` returns `>0`, then the next call to `read()` will not block.
Grodriguez
@Frozen Spider: You do not need to rewrite the whole module. The single thread approach is perfectly fine. Non-blocking I/O is also an option of course but all three alternatives I suggested are feasible.
Grodriguez
@Grodriguez, fine. You say *when available() returns >0, then the next call to read() will not block*, and that's correct. However, what's important here is the opposite: **when available() return 0, then the next call to read() may not block**. That is, even if available says "there are 0 available bytes", it may be the case that there *are* bytes available. This means that your third option may simply spin for ever, without realizing that there is data to be read. (Besides, the bug-report you posted is not actually a report of a bug, but a request for enhancement.)
aioobe
You say *when available() returns 0, then the next call to read() may not block*. That is of course correct (for example new data may have arrived between the two calls), but it does **not** mean in any way that `available()` may "return zero forever". This **would** be a bug. If you search Sun's bug parade you'll see that some similar issues have been filed as (and accepted as) bugs, not "enhancements".
Grodriguez
The `available()` method should return an *estimate* of the number of bytes available on the input stream. If an implementation, for one reason or another, can't tell the number of bytes available without blocking, and "estimates" the number of bytes available to 0, it's not violating the contract of the InputStream. (It *would* be a violation to "estimate" the number of bytes to >0 if the next read turned out to block.)
aioobe
I have yet to see an implementation that does not know whether it can read "some" data or not, which is all that is needed in order to return either zero or non-zero from `available`. What you are saying is in essence that `available` is completely useless, which is not -- you only need to know how it works (many people don't). As I said Sun's bug parade has some examples of bugs for `InputStream` implementations that always return zero from `available()`.
Grodriguez
BTW if an implementation could not tell whether "some" data (even if it is only 1 byte) can be read without blocking, then you would not be able to implement NIO in non-blocking mode for that data source either. Don't you realize that the information you need in order to implement `available()` is exactly the same information you need in order to check whether a `SelectableChannel` is available for reading?
Grodriguez
In case the OP is only using implementations of InputStream with documented, good behaviour (i.e., return 0 iff a read would block) he could of course go for a single-threaded solution. If the OP is after a reusable and robust implementation he would either need to turn to SelectableChannels or a multi-threaded solution. It is true that not all InputStreams are "selectable". If the OP is stuck with such InputStreams, then a (carefully synchronized) multi-threaded solution is, as far as I can see, the only way to go.
aioobe
BTW, [here is an explanation](http://forums.sun.com/thread.jspa?threadID=5404977) why SSL sockets available() may return 0, even though it has bytes available: *InputStream.available() on an SSL socket always returns zero because it can't tell how much data is available to read without blocking unless it decrypts some data, and it can't in general do that without blocking because SSL comes in discrete records. So it returns zero.*
aioobe
@aioobe: That is a just comment made by a user, not an official statement from anyone at Sun. In my opinion it is plain wrong.
Grodriguez
The chances of an `InputStream` not being able to implement `available` properly (a simple implementation could just to return 1 if some data can be read without blocking, 0 otherwise) are exactly the same chances of coming across a `SelectableChannel` that cannot tell whether it is available for reading. It is theoretically possible of course, but a non-issue in practice. There is no reason why a solution based on SelectableChannels would be more "reusable and robust" than a solution based on InputStreams. Both are perfectly feasible, and both can be made reusable and robust.
Grodriguez
This discussion leads nowhere. We both know the other persons standpoint. You claim that all correctly implemented InputStreams return 0 for available, if and only if a read would block, and I'm claiming that this is by no means guaranteed by [the documentation](http://download.oracle.com/javase/6/docs/api/java/io/InputStream.html#available%28%29).
aioobe
What I claim is that your argument against InputStreams can also be applied to SelectableChannels, yet you keep insisting that for a "reusable and robust implementation", SelectableChannels should be used instead of InputStreams.
Grodriguez