tags:

views:

1349

answers:

6

Having two InputStreams in Java, is there a way to merge them so you end with one InputStream that gives you the output of both streams? How?

A: 

Not that I can think of. You would probably have to read the contents of the two stream into a byte[] and then create a ByteArrayInputStream from that.

willcodejavaforfood
Upvote for a simple, easy to understand, workable solution. Might not have the required behaviour if blocking is important (or they are huge).
Tom Hawtin - tackline
+5  A: 

Dr. Egon Spengler: There's something very important I forgot to tell you.

Dr. Peter Venkman: What?

Dr. Egon Spengler: Don't cross the streams.

Dr. Peter Venkman: Why?

Dr. Egon Spengler: It would be bad.

Dr. Peter Venkman: I'm a little fuzzy on the whole "good/bad" thing here. What do you mean, "bad"?

Dr. Egon Spengler: Try to imagine all life as you know it stopping instantaneously and every molecule in your body exploding at the speed of light.

Dr. Ray Stantz: Total protonic reversal!

Dr. Peter Venkman: That's bad. Okay. All right, important safety tip. Thanks, Egon.

http://en.wikiquote.org/wiki/Ghostbusters

Ed Guiness
-1, since it is funny but not so useful
dfa
+1 for being funny, quoting something the "younsters" don't watch anymore, and it being a community wiki (i.e. no garbage rep points!). :-)
James Schek
+1, since it is funny
matt b
+3  A: 

You can write a custom InputStream implementation that does this. Example:

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;

public class CatInputStream extends InputStream {
    private final Deque<InputStream> streams;

    public CatInputStream(InputStream... streams) {
        this.streams = new LinkedList<InputStream>();
        Collections.addAll(this.streams, streams);
    }

    private void nextStream() throws IOException {
        streams.removeFirst().close();
    }

    @Override
    public int read() throws IOException {
        int result = -1;
        while (!streams.isEmpty()
                && (result = streams.getFirst().read()) == -1) {
            nextStream();
        }
        return result;
    }

    @Override
    public int read(byte b[], int off, int len) throws IOException {
        int result = -1;
        while (!streams.isEmpty()
                && (result = streams.getFirst().read(b, off, len)) == -1) {
            nextStream();
        }
        return result;
    }

    @Override
    public long skip(long n) throws IOException {
        long skipped = 0L;
        while (skipped < n && !streams.isEmpty()) {
            int thisSkip = streams.getFirst().skip(n - skipped);
            if (thisSkip > 0)
                skipped += thisSkip;
            else
                nextStream();
        }
        return skipped;
    }

    @Override
    public int available() throws IOException {
        return streams.isEmpty() ? 0 : streams.getFirst().available();
    }

    @Override
    public void close() throws IOException {
        while (!streams.isEmpty())
            nextStream();
    }
}

This code isn't tested, so your mileage may vary.

Chris Jester-Young
Doesn't this do the same thing as SequenceInputStream, as suggested by Merzbow?
Sam Barnum
Sorry, but tackline suggested SequenceInputStream first (and I +1'd him for that). In SO, the earliest good answer wins; you never know if later answers are plagiarisms. Also, read my comment on tackline's answer for a comparison between SequenceInputStream and CatInputStream (I do take his point about using Collections.enumeration).
Chris Jester-Young
If the person who downvoted my answer did so because of my last comment, I apologise; I should explain better: if I write an answer that turns out to be a duplicate (or subset) of an earlier-posted answer, I usually delete it, knowing that I will never receive any points for it. On SO, it really is "Fastest Gun in the West" (search for that question title).
Chris Jester-Young
+6  A: 

As commented, it's not clear what you mean by merge.

Taking available input "randomly" from either is complicated by InputStream.available not necessarily giving you a useful answer and blocking behaviour of streams. You would need two threads to be reading from the streams and then passing back data through, say, java.io.Piped(In|Out)putStream (although those classes have issues). Alternatively for some types of stream it may be possible to use a different interface, for instance java.nio non-blocking channels.

If you want the full contents of the first input stream followed by the second: new java.io.SequenceInputStream(s1, s2).

Tom Hawtin - tackline
Oh, very nice, I just learnt something new. SequenceInputStream is essentially identical to my CatInputStream, but using legacy Enumerations instead of, say, a LinkedList. :-)
Chris Jester-Young
As a hack to the first part of your answer, it's hard to solve in the general case but for specific cases of FileInputStream (and maybe sockets too?) you can instanceof/cast and create a channel out of it. (The other streams can use Channels.newChannel to create a consistent interface, but won't have the non-blocking qualities required.)
Chris Jester-Young
Collections.enumeration is your friend. I forgot a part of my first part - will edit.
Tom Hawtin - tackline
Actually FileChannel is not a SelectableChannel, although SocketChannel is. So I believe you're a bit stuck with files. Until "More NIO Features" in JDK7 (probably).
Tom Hawtin - tackline
NIO2 ftw; it will feature asynchronous I/O too, which will make implementing this merged-channel business much more fun (a callback on any input channel causes a callback to the merged channel). :-)
Chris Jester-Young
My desk used to be next to that of the spec lead. :-)
Tom Hawtin - tackline
Hah, what a small world. :-D Alan Bateman got in touch with me late last year, regarding my efforts to port NIO2 to IcedTea (I even have an SO question about it); sadly on that front, I since got a full-time job and never found the time to return to it. But, the good news is that the work's not wasted; GNU/Andrew from Red Hat (who does a lot of work with IcedTea) has since picked it up: http://blog.fuseyism.com/index.php/2008/11/21/icedtea-18-released/
Chris Jester-Young
+4  A: 

java.io.SequenceInputStream might be what you need. It accepts an enumeration of streams, and will output the contents of the first stream, then the second, and so on until all streams are empty.

A: 

Here is an MVar implementation specific to byte arrays (make sure to add your own package definition). From here, it is trivial to write an input stream on merged streams. I can post that too if requested.

import java.nio.ByteBuffer;

public final class MVar {

  private static enum State {
    EMPTY, ONE, MANY
  }

  private final Object lock;

  private State state;

  private byte b;

  private ByteBuffer bytes;
  private int length;

  public MVar() {
    lock = new Object();
    state = State.EMPTY;
  }

  public final void put(byte b) {
    synchronized (lock) {
      while (state != State.EMPTY) {
        try {
          lock.wait();
        } catch (InterruptedException e) {}
      }
      this.b = b;
      state = State.ONE;
      lock.notifyAll();
    }
  }

  public final void put(byte[] bytes, int offset, int length) {
    if (length == 0) {
      return;
    }
    synchronized (lock) {
      while (state != State.EMPTY) {
        try {
          lock.wait();
        } catch (InterruptedException e) {}
      }
      this.bytes = ByteBuffer.allocateDirect(length);
      this.bytes.put(bytes, offset, length);
      this.bytes.position(0);
      this.length = length;
      state = State.MANY;
      lock.notifyAll();
    }
  }

  public final byte take() {
    synchronized (lock) {
      while (state == State.EMPTY) {
        try {
          lock.wait();
        } catch (InterruptedException e) {}
      }
      switch (state) {
      case ONE: {
        state = State.EMPTY;
        byte b = this.b;
        lock.notifyAll();
        return b;
      }
      case MANY: {
        byte b = bytes.get();
        state = --length <= 0 ? State.EMPTY : State.MANY;
        lock.notifyAll();
        return b;
      }
      default:
        throw new AssertionError();
      }
    }
  }

  public final int take(byte[] bytes, int offset, int length) {
    if (length == 0) {
      return 0;
    }
    synchronized (lock) {
      while (state == State.EMPTY) {
        try {
          lock.wait();
        } catch (InterruptedException e) {}
      }
      switch (state) {
      case ONE:
        bytes[offset] = b;
        state = State.EMPTY;
        lock.notifyAll();
        return 1;
      case MANY:
        if (this.length > length) {
          this.bytes.get(bytes, offset, length);
          this.length = this.length - length;
          synchronized (lock) {
            lock.notifyAll();
          }
          return length;
        }
        this.bytes.get(bytes, offset, this.length);
        this.bytes = null;
        state = State.EMPTY;
        length = this.length;
        lock.notifyAll();
        return length;
      default:
        throw new AssertionError();
      }
    }
  }
}
ScootyPuff