I have implemented a small IO class, which can read from multiple and same files on different disks (e.g two hard disks containing the same file). In sequential case, both disks read 60MB/s in average over the file, but when I do an interleaved (e.g. 4k disk 1, 4k disk 2 then combine), the effective read speed is reduced to 40MB/s instead of increasing?
Context: Win 7 + JDK 7b70, 2GB RAM, 2.2GB test file. Basically, I try to mimic Win7's ReadyBoost and RAID x in a poor man's fashion.
In the heart, when a read() is issued to the class, it creates two runnables with instructions to read a pre-opened RandomAccessFile from a certain position and length. Using an executor service and Future.get() calls, when both finish, the data read gets copied into a common buffer and returned to the caller.
Is there a conceptional error in my approach? (For example, the OS caching mechanism will always counteract?)
protected <T> List<T> waitForAll(List<Future<T>> futures)
throws MultiIOException {
MultiIOException mex = null;
int i = 0;
List<T> result = new ArrayList<T>(futures.size());
for (Future<T> f : futures) {
try {
result.add(f.get());
} catch (InterruptedException ex) {
if (mex == null) {
mex = new MultiIOException();
}
mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
} catch (ExecutionException ex) {
if (mex == null) {
mex = new MultiIOException();
}
mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
}
i++;
}
if (mex != null) {
throw mex;
}
return result;
}
public int read(long position, byte[] output, int start, int length)
throws IOException {
if (start < 0 || start + length > output.length) {
throw new IndexOutOfBoundsException(
String.format("start=%d, length=%d, output=%d",
start, length, output.length));
}
// compute the fragment sizes and positions
int result = 0;
final long[] positions = new long[metrics.length];
final int[] lengths = new int[metrics.length];
double speedSum = 0.0;
double maxValue = 0.0;
int maxIndex = 0;
for (int i = 0; i < metrics.length; i++) {
speedSum += metrics[i].readSpeed;
if (metrics[i].readSpeed > maxValue) {
maxValue = metrics[i].readSpeed;
maxIndex = i;
}
}
// adjust read lengths
int lengthSum = length;
for (int i = 0; i < metrics.length; i++) {
int len = (int)Math.ceil(length * metrics[i].readSpeed / speedSum);
lengths[i] = (len > lengthSum) ? lengthSum : len;
lengthSum -= lengths[i];
}
if (lengthSum > 0) {
lengths[maxIndex] += lengthSum;
}
// adjust read positions
long positionDelta = position;
for (int i = 0; i < metrics.length; i++) {
positions[i] = positionDelta;
positionDelta += (long)lengths[i];
}
List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
// read in parallel
for (int i = 0; i < metrics.length; i++) {
final int j = i;
futures.add(exec.submit(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
byte[] buffer = new byte[lengths[j]];
long t = System.nanoTime();
long t0 = t;
long currPos = metrics[j].handle.getFilePointer();
metrics[j].handle.seek(positions[j]);
t = System.nanoTime() - t;
metrics[j].seekTime = t * 1024.0 * 1024.0 /
Math.abs(currPos - positions[j]) / 1E9 ;
int c = metrics[j].handle.read(buffer);
t0 = System.nanoTime() - t0;
// adjust the read speed if we read something
if (c > 0) {
metrics[j].readSpeed = (alpha * c * 1E9 / t0 / 1024 / 1024
+ (1 - alpha) * metrics[j].readSpeed) ;
}
if (c < 0) {
return null;
} else
if (c == 0) {
return EMPTY_BYTE_ARRAY;
} else
if (c < buffer.length) {
return Arrays.copyOf(buffer, c);
}
return buffer;
}
}));
}
List<byte[]> data = waitForAll(futures);
boolean eof = true;
for (byte[] b : data) {
if (b != null && b.length > 0) {
System.arraycopy(b, 0, output, start + result, b.length);
result += b.length;
eof = false;
} else {
break; // the rest probably reached EOF
}
}
// if there was no data at all, we reached the end of file
if (eof) {
return -1;
}
sequentialPosition = position + (long)result;
// evaluate the fastest file to read
double maxSpeed = 0;
maxIndex = 0;
for (int i = 0; i < metrics.length; i++) {
if (metrics[i].readSpeed > maxSpeed) {
maxSpeed = metrics[i].readSpeed;
maxIndex = i;
}
}
fastest = metrics[maxIndex];
return result;
}
(FileMetrics in metrics array contain measurements of read speed to adaptively determine the buffer sizes of various input channels - in my test with alpha = 0 and readSpeed = 1 results equal distribution)
Edit I ran an non-entangled test (e.g read the two files independently in separate threads.) and I've got a combined effective speed of 110MB/s.
Edit2 I guess I know why is this happening.
When I read in parallel and in sequence, it is not a sequential read for the disks, but rather read-skip-read-skip pattern due the interleaving (and possibly riddled with allocation table lookups). This basically reduces the effective read speed per disk to half or worse.