Hello there, fellow members. I goal to create (or use existing) an InputStream implementation (say, MergeInputStream), that will try to read from a multiple InputStreams and return the first result. After that it will release block and stop reading from all InputStreams until next mergeInputStream.read() call. I was quite surprised, that I didn't found any such tool. The thing is: all of the source InputStreams are not quite finite (not a file, for example, but a System.in, socket or such), so I cannot use SequenceInputReader. I understand, that this will probably require some multi-thread mechanism, but I have absolutely no idea how to do it. I tried to google with no result.
The problem of reading input from multiple sources and serializing them into one stream is preferably solved using SelectableChannel
and Selector
. This however requires that all sources are able to provide a selectable channel. This may or may not be the case.
If selectable channels are not available, you could choose to solve it with a single thread by letting the read-implementation do the following: For each input stream is
, check if is.available() > 0
, and if so return is.read()
. Repeat this procedure until some input stream has data available.
This method however, has two major draw-backs:
Not all implementations of
InputStream
implementsavailable()
in a way such that it returns 0 if and only ifread()
will block. The result is, naturally, that data may not be read from this stream, even thoughis.read()
would return a value. Whether or not this is to be considered as a bug is questionable, as the documentation merely states that it should return an "estimate" of the number of bytes available.It uses a so called "busy-loop", which basically means that you'll either need to put a sleep in the loop (which results in a reading latency) or hog the CPU unnecessarily.
Your third option is to deal with the blocking reads by spawning one thread for each input stream. This however will require careful synchronization and possibly some overhead if you have a very high number of input streams to read from. The code below is a first attempt to solve it. I'm by no means certain that it is sufficiently synchronized, or that it manages the threads in the best possible way.
import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MergedInputStream extends InputStream {
AtomicInteger openStreamCount;
BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
InputStream[] sources;
public MergedInputStream(InputStream... sources) {
this.sources = sources;
openStreamCount = new AtomicInteger(sources.length);
for (int i = 0; i < sources.length; i++)
new ReadThread(i).start();
}
public void close() throws IOException {
String ex = "";
for (InputStream is : sources) {
try {
is.close();
} catch (IOException e) {
ex += e.getMessage() + " ";
}
}
if (ex.length() > 0)
throw new IOException(ex.substring(0, ex.length() - 1));
}
public int read() throws IOException {
if (openStreamCount.get() == 0)
return -1;
try {
return buf.take();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private class ReadThread extends Thread {
private final int src;
public ReadThread(int src) {
this.src = src;
}
public void run() {
try {
int data;
while ((data = sources[src].read()) != -1)
buf.put(data);
} catch (IOException ioex) {
} catch (InterruptedException e) {
}
openStreamCount.decrementAndGet();
}
}
}
I can think of three ways to do this:
- Use non-blocking I/O (API documentation). This is the cleanest solution.
- Multiple threads, one for each merged input stream. The threads would block on the
read()
method of the associated input stream, then notify theMergeInputStream
object when data becomes available. Theread()
method inMergedInputStream
would wait for this notification, then read data from the corresponding stream. - Single thread with a busy loop. Your
MergeInputStream.read()
methods would need to loop checking theavailable()
method of every merged input stream. If no data is available, sleep a few ms. Repeat until data becomes available in one of the merged input streams.