views:

562

answers:

3

I have two threads. One of them writes to PipedOutputStream, another one reads from corresponding PipedInputStream. Background is that one thread is downloading some data from remote server and multiplexes it to several other threads through piped streams.

The problem is that sometimes (especially when downloading large (>50Mb) files) I get java.io.IOException: Pipe broken when trying to read from PipedInputStream.
Javadoc says that A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive.
It is true, my writing thread really dies after writing all his data to PipedOutputStream.

Any solutions? How can I prevent PipedInputStream from throwing this exception? I want to be able to read all data that was written to PipedOutputStream even if writing thread finished his work. (If anybody knows how to keep writing thread alive until all data will be read, this solution is also acceptable).

+6  A: 

Use a java.util.concurrent.CountDownLatch, and do not end the first thread before the second one has signaled that is has finished reading from the pipe.

Update: quick and dirty code to illustrate my comment below

 final PipedInputStream pin = getInputStream();
 final PipedOutputStream pout = getOutputStream();

 final CountDownLatch latch = new CountDownLatch(1);

 InputStream in = new InputStream() {

  @Override
  public int read() throws IOException {
   return pin.read();
  }

  @Override
  public void close() throws IOException {
   super.close();
   latch.countDown();
  }
 };


 OutputStream out = new OutputStream(){

  @Override
  public void write(int b) throws IOException {
   pout.write(b);
  }

  @Override
  public void close() throws IOException {
   while(latch.getCount()!=0) {
    try {
     latch.await();
    } catch (InterruptedException e) {
     //too bad
    }
   }
   super.close();
  }
 };

 //give the streams to your threads, they don't know a latch ever existed
 threadOne.feed(in);
 threadTwo.feed(out);
Jerome
+1 Never come accross CountDownLatch before, looks useful.
Benj
Nice feature, its definitely +1, but it needs to share one instance of CountDownLatch between different threads. This is not very good, because writing and reading threads are created in different places and I want them not to know about each other. My architecture now is such that they know only that should write/read to/from given stream.
levanovd
Then, may-be you could extend Piped[In|Out]putStream to handle the manipulation of the CountDownLatch.
Jerome
or write your own Input/OutputStream that wraps the Pipe and the Latch (see the sample code that I added in my answer)
Jerome
Thank you very much, that was right what I needed! I've used your solution and implemented something like that. No problems with piped streams anymore, I'm happy! )
levanovd
A: 

PipedInputStream and PipedOutputStream are broken (with regards to threading). They assume each instance is bound to a particular thread. This is bizarre. I suggest using your own (or at least a different) implementation.

Tom Hawtin - tackline
A: 

Are you closing your PipedOutputStream when the thread that's using it ends? You need to do this so the bytes in it will get flushed to the corresponding PipedInputStream.

wds
Yes, I close it.
levanovd
I really think something else is going wrong here, in any case you should never receive a broken pipe if the writing thread ended normally. If its data doesn't fit in the `PipedInputStream` it should just block until there's room.
wds