I've written background InputStream
(and OutputStream
) implementations that wrap other streams, and read ahead on a background thread, primarily allowing for decompression/compression to happen in different threads from the processing of the decompressed stream.
It's a fairly standard producer/consumer model.
This seems like an easy way to make good use of multi-core CPUs with simple processes that read, process, and write data, allowing for more efficient use of both CPU and disk resources. Perhaps 'efficient' isn't the best word, but it provides higher utilisation, and of more interest to me, reduced runtimes, compared to reading directly from a ZipInputStream
and writing directly to a ZipOutputStream
.
I'm happy to post the code, but my question is whether I'm reinventing something readily available in existing (and more heavily exercised) libraries?
Edit - posting code...
My code for the BackgroundInputStream
is below (the BackgroundOutputStream
is very similar), but there are aspects of it that I'd like to improve.
- It looks like I'm working far too hard to pass buffers back and forward.
- If the calling code throws away references to the
BackgroundInputStream
, thebackgroundReaderThread
will hang around forever. - Signalling
eof
needs improving. - Exceptions should be propagated to the foreground thread.
- I'd like to allow using a thread from a provided
Executor
. - The
close()
method should signal the background thread, and shouldn't close the wrapped stream, as the wrapped stream should be owned by the background thread that reads from it. - Doing silly things like reading after closing should be catered for appropriately.
package nz.co.datacute.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
public class BackgroundInputStream extends InputStream {
private static final int DEFAULT_QUEUE_SIZE = 1;
private static final int DEFAULT_BUFFER_SIZE = 64*1024;
private final int queueSize;
private final int bufferSize;
private volatile boolean eof = false;
private LinkedBlockingQueue<byte[]> bufferQueue;
private final InputStream wrappedInputStream;
private byte[] currentBuffer;
private volatile byte[] freeBuffer;
private int pos;
public BackgroundInputStream(InputStream wrappedInputStream) {
this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
}
public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
this.wrappedInputStream = wrappedInputStream;
this.queueSize = queueSize;
this.bufferSize = bufferSize;
}
@Override
public int read() throws IOException {
if (bufferQueue == null) {
bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
BackgroundReader backgroundReader = new BackgroundReader();
Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
backgroundReaderThread.start();
}
if (currentBuffer == null) {
try {
if ((!eof) || (bufferQueue.size() > 0)) {
currentBuffer = bufferQueue.take();
pos = 0;
} else {
return -1;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int b = currentBuffer[pos++];
if (pos == currentBuffer.length) {
freeBuffer = currentBuffer;
currentBuffer = null;
}
return b;
}
@Override
public int available() throws IOException {
if (currentBuffer == null) return 0;
return currentBuffer.length;
}
@Override
public void close() throws IOException {
wrappedInputStream.close();
currentBuffer = null;
freeBuffer = null;
}
class BackgroundReader implements Runnable {
@Override
public void run() {
try {
while (!eof) {
byte[] newBuffer;
if (freeBuffer != null) {
newBuffer = freeBuffer;
freeBuffer = null;
} else {
newBuffer = new byte[bufferSize];
}
int bytesRead = 0;
int writtenToBuffer = 0;
while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
writtenToBuffer += bytesRead;
}
if (writtenToBuffer > 0) {
if (writtenToBuffer < bufferSize) {
newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
}
bufferQueue.put(newBuffer);
}
if (bytesRead == -1) {
eof = true;
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}