tags:

views:

42

answers:

1

When I create a channel out of the InputStream with Channels.newChannel(is) java standard library returns a ReadableByteChannelImpl, which is:

   private static class ReadableByteChannelImpl
       extends AbstractInterruptibleChannel    // Not really interruptible
       implements ReadableByteChannel
   {
       InputStream in;
       private static final int TRANSFER_SIZE = 8192;
       private byte buf[] = new byte[0];
       private boolean open = true;
       private Object readLock = new Object();

       ReadableByteChannelImpl(InputStream in) {
           this.in = in;
       }

       public int read(ByteBuffer dst) throws IOException {
           int len = dst.remaining();
           int totalRead = 0;
           int bytesRead = 0;
           synchronized (readLock) {
               while (totalRead < len) {
                   int bytesToRead = Math.min((len - totalRead),
                                              TRANSFER_SIZE);
                   if (buf.length < bytesToRead)
                       buf = new byte[bytesToRead];
                   if ((totalRead > 0) && !(in.available() > 0))
                       break; // block at most once
                   try {
                       begin();
                       bytesRead = in.read(buf, 0, bytesToRead);
                   } finally {
                       end(bytesRead > 0);
                   }
                   if (bytesRead < 0)
                       break;
                   else
                       totalRead += bytesRead;
                   dst.put(buf, 0, bytesRead);
               }
               if ((bytesRead < 0) && (totalRead == 0))
                   return -1;

               return totalRead;
           }
       }

       protected void implCloseChannel() throws IOException {
           in.close();
           open = false;
       }
   }

As you can see it blocks when calling read(ByteBuffer dst) for the first time, and never blocks again. See:

           if ((totalRead > 0) && !(in.available() > 0))
               break; // block at most once

What is the reason behind such a weird behavior?

Also, what is the motivation for extending AbstractInterruptibleChannel without actually making this channel truly interruptible ?

+1  A: 

It will not block if it's already read at least one byte and the underlying stream announces that no bytes are available. Note that InputStream#available() can return zero even when some bytes are available, but it should not promise more bytes than can be read without blocking. Hence, this ReadableByteChannel makes an effort to read at least one byte -- assuming the provided ByteBuffer has room for at least one byte -- and, having done so, will not attempt to read the underlying stream again unless the stream is promising that more bytes are available without blocking.

As for why ReadableByteChannelImpl extends AbstractInterruptibleChannel, I suspect that it's to ensure that the wrapped InputStream will be closed properly upon calling Channel#close(), whose contract is further refined by InterruptibleChannel#close(). Extending AbstractInterruptibleChannel allows ReadableByteChannelImpl to borrow its thread-safe opened-closed state guards.

It is a bit of false advertising, as you say, not being truly interruptible, but it is tolerant of being closed from a separate thread, and makes doing so idempotent.

seh
Yeap, got it myself already. The main reason for blocking during the first read() call is just an absence of any reliable way to check if the underlying stream is empty without actually reading it.
VB