Having two InputStreams in Java, is there a way to merge them so you end with one InputStream that gives you the output of both streams? How?
Not that I can think of. You would probably have to read the contents of the two stream into a byte[] and then create a ByteArrayInputStream from that.
Dr. Egon Spengler: There's something very important I forgot to tell you.
Dr. Peter Venkman: What?
Dr. Egon Spengler: Don't cross the streams.
Dr. Peter Venkman: Why?
Dr. Egon Spengler: It would be bad.
Dr. Peter Venkman: I'm a little fuzzy on the whole "good/bad" thing here. What do you mean, "bad"?
Dr. Egon Spengler: Try to imagine all life as you know it stopping instantaneously and every molecule in your body exploding at the speed of light.
Dr. Ray Stantz: Total protonic reversal!
Dr. Peter Venkman: That's bad. Okay. All right, important safety tip. Thanks, Egon.
You can write a custom InputStream
implementation that does this. Example:
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
public class CatInputStream extends InputStream {
private final Deque<InputStream> streams;
public CatInputStream(InputStream... streams) {
this.streams = new LinkedList<InputStream>();
Collections.addAll(this.streams, streams);
}
private void nextStream() throws IOException {
streams.removeFirst().close();
}
@Override
public int read() throws IOException {
int result = -1;
while (!streams.isEmpty()
&& (result = streams.getFirst().read()) == -1) {
nextStream();
}
return result;
}
@Override
public int read(byte b[], int off, int len) throws IOException {
int result = -1;
while (!streams.isEmpty()
&& (result = streams.getFirst().read(b, off, len)) == -1) {
nextStream();
}
return result;
}
@Override
public long skip(long n) throws IOException {
long skipped = 0L;
while (skipped < n && !streams.isEmpty()) {
int thisSkip = streams.getFirst().skip(n - skipped);
if (thisSkip > 0)
skipped += thisSkip;
else
nextStream();
}
return skipped;
}
@Override
public int available() throws IOException {
return streams.isEmpty() ? 0 : streams.getFirst().available();
}
@Override
public void close() throws IOException {
while (!streams.isEmpty())
nextStream();
}
}
This code isn't tested, so your mileage may vary.
As commented, it's not clear what you mean by merge.
Taking available input "randomly" from either is complicated by InputStream.available
not necessarily giving you a useful answer and blocking behaviour of streams. You would need two threads to be reading from the streams and then passing back data through, say, java.io.Piped(In|Out)putStream
(although those classes have issues). Alternatively for some types of stream it may be possible to use a different interface, for instance java.nio
non-blocking channels.
If you want the full contents of the first input stream followed by the second: new java.io.SequenceInputStream(s1, s2)
.
Here is an MVar implementation specific to byte arrays (make sure to add your own package definition). From here, it is trivial to write an input stream on merged streams. I can post that too if requested.
import java.nio.ByteBuffer;
public final class MVar {
private static enum State {
EMPTY, ONE, MANY
}
private final Object lock;
private State state;
private byte b;
private ByteBuffer bytes;
private int length;
public MVar() {
lock = new Object();
state = State.EMPTY;
}
public final void put(byte b) {
synchronized (lock) {
while (state != State.EMPTY) {
try {
lock.wait();
} catch (InterruptedException e) {}
}
this.b = b;
state = State.ONE;
lock.notifyAll();
}
}
public final void put(byte[] bytes, int offset, int length) {
if (length == 0) {
return;
}
synchronized (lock) {
while (state != State.EMPTY) {
try {
lock.wait();
} catch (InterruptedException e) {}
}
this.bytes = ByteBuffer.allocateDirect(length);
this.bytes.put(bytes, offset, length);
this.bytes.position(0);
this.length = length;
state = State.MANY;
lock.notifyAll();
}
}
public final byte take() {
synchronized (lock) {
while (state == State.EMPTY) {
try {
lock.wait();
} catch (InterruptedException e) {}
}
switch (state) {
case ONE: {
state = State.EMPTY;
byte b = this.b;
lock.notifyAll();
return b;
}
case MANY: {
byte b = bytes.get();
state = --length <= 0 ? State.EMPTY : State.MANY;
lock.notifyAll();
return b;
}
default:
throw new AssertionError();
}
}
}
public final int take(byte[] bytes, int offset, int length) {
if (length == 0) {
return 0;
}
synchronized (lock) {
while (state == State.EMPTY) {
try {
lock.wait();
} catch (InterruptedException e) {}
}
switch (state) {
case ONE:
bytes[offset] = b;
state = State.EMPTY;
lock.notifyAll();
return 1;
case MANY:
if (this.length > length) {
this.bytes.get(bytes, offset, length);
this.length = this.length - length;
synchronized (lock) {
lock.notifyAll();
}
return length;
}
this.bytes.get(bytes, offset, this.length);
this.bytes = null;
state = State.EMPTY;
length = this.length;
lock.notifyAll();
return length;
default:
throw new AssertionError();
}
}
}
}