A better way is to get rid of the cause of blocking in the first place. Use these classes instead on both ends, if you can:
public class ObjInputStream extends ObjectInputStream {
/**
* @param in
* @throws IOException
*/
public ObjInputStream(InputStream in) throws IOException {
super(in);
}
/* (non-Javadoc)
* @see java.io.ObjectInputStream#readStreamHeader()
*/
@Override
protected void readStreamHeader() throws IOException, StreamCorruptedException {
}
}
and
public class ObjOutputStream extends ObjectOutputStream {
/**
* @param out
* @throws IOException
*/
public ObjOutputStream(OutputStream out) throws IOException {
super(out);
}
/* (non-Javadoc)
* @see java.io.ObjectOutputStream#writeStreamHeader()
*/
@Override
protected void writeStreamHeader() throws IOException {
}
}
This removes the functions which are called to ascertain stream version info and such.
Additionally, as you are using TCP packets, IP fragmentation will cause your objects not be received 'whole' on the other end -- TCP is a stream socket. What you need is an additional framing input / output class. Luckily, I already coded this :)
/**
*
*/
package objtest;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import kokuks.KokuKS;
/**
* UnrealConceptTest - FramedInputStream
* @version 1.0
*/
public class FramedInputStream extends InputStream {
public static final int INITIAL_BUFFER_SIZE = 8 << 1;
public static final int FRAME_HEADER_1 = 0xBEEFFACE;
public static final int FRAME_HEADER_2 = 0xFACEBEEF;
public static final byte[] HEADER_BYTES = new byte[4 * 2];
protected static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length];
static {
ByteBuffer b = ByteBuffer.allocateDirect(8);
b.putInt(FRAME_HEADER_1);
b.putInt(FRAME_HEADER_2);
ByteBuffer b2 = (ByteBuffer) b.flip();
b2.get(HEADER_BYTES, 0, 4);
b2.get(HEADER_BYTES, 3, 4);
}
protected int size = 0;
protected int chain = 0;
protected boolean inFrame = false;
protected boolean readingSize = false;
protected int sizePos = 0;
protected int dbgput = 0;
protected ByteBuffer bb = ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE);
protected Queue<ByteBuffer> bbq = new ArrayDeque<ByteBuffer>();
protected ByteBuffer currBuff = null;
protected final boolean recoverFromError;
/**
*
*/
public FramedInputStream(boolean recoverFromError) {
this.recoverFromError = recoverFromError;
}
public FramedInputStream() {
this(true);
}
protected boolean ensureFramebufferCapacity(int min) {
int mymin = 1 << min;
if (mymin <= bb.capacity()) return false;
int num = bb.capacity();
while (num < mymin) num <<= 1;
ByteBuffer bb2 = ByteBuffer.allocateDirect(num);
// copy old data into new bytebuffer
int bb_pos = bb.position();
bb.rewind();
bb2.put(bb);
bb = bb2;
if (KokuKS.DEBUG_MODE) System.out.println("modified buffer size to: " + num);
return true;
}
/**
* @return the recoverFromError
*/
public boolean isRecoverFromError() {
return recoverFromError;
}
/* (non-Javadoc)
* @see java.io.InputStream#read()
*/
@Override
public int read() throws IOException {
if (currBuff == null || !currBuff.hasRemaining()) return -1;
byte b = currBuff.get();
//System.out.println("data: " + b);
return b;
}
public void putBuffer(ByteBuffer source) {
ensureFramebufferCapacity(bb.capacity() + source.remaining());
while (source.hasRemaining()) {
putByte(source.get());
}
}
public boolean checkCompleteFrame() {
return !bbq.isEmpty();
}
/* (non-Javadoc)
* @see java.io.InputStream#available()
*/
@Override
public int available() throws IOException {
return currBuff != null ? currBuff.remaining() : 0;
}
public int read(byte[] data) {
if (currBuff == null || !currBuff.hasRemaining()) {
return -1;
}
if (data.length > currBuff.remaining()) {
throw new BufferUnderflowException();
}
currBuff.get(data);
//System.out.println("data: " + new String(data));
return data.length;
}
public boolean nextFrame() {
ByteBuffer bbf = bbq.poll();
if (bbf != null) {
/*
System.out.println("bbf limit: " + bbf.limit());
System.out.println("bbf pos: " + bbf.position());
System.out.println("bbf data: " + new String(bbf.array()));
*/
//byte[] data = bbf.array();
//for (int i = 0; i < data.length; i++) {
// byte by = data[i];
// System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by);
//}
currBuff = ByteBuffer.allocateDirect(bbf.limit());
currBuff.put(bbf).flip();
bbf.rewind();
/*
System.out.println("currbuf limit: " + currBuff.limit());
System.out.println("currbuf pos: " + currBuff.position());
System.out.println("currbuf data: " + new String(currBuff.array()));
*/
currBuff.rewind();
currBuff.position(1);
return true;
}
return false;
}
public void putByte(byte b) {
//System.out.println("pb b: " + ObjTest.getByteStr(b));
if (recoverFromError || !inFrame) {
if (b == HEADER_BYTES[chain++]) {
if (chain >= (HEADER_BYTES.length)) {
if (KokuKS.DEBUG_MODE) System.out.println("got header!" + (inFrame ? " (recovered)" : ""));
// we have a header! hurrah.
inFrame = true;
sizePos = 0;
size = 0;
readingSize = true;
chain = 0;
bb.clear();
}
} else {
chain = 0;
}
}
if (inFrame) {
if (readingSize) {
size += (b & 0xFF) << ((8 * 3) - (8 * sizePos));
//System.out.println("new size: " + size);
sizePos++;
if (sizePos >= 4) {
// we've read the size :)
readingSize = false;
sizePos = 0;
ensureFramebufferCapacity(size);
bb.clear();
bb.limit(size); // set buffer limit to size
//System.out.println("bb limit set to: " + bb.limit());
}
} else {
//System.out.println("put: " + dbgput++ + ", " + ObjTest.getByteStr(b));
bb.put(b);
if (!bb.hasRemaining()) {
bb.flip();
//System.out.println("bb limit after flip(): " + bb.limit());
//System.out.println("bblimit: " + bb.limit());
ByteBuffer newbuf = ByteBuffer.allocateDirect(bb.limit());
newbuf.put(bb).flip(); //we have to flip this
bbq.offer(newbuf);
//byte[] data = newbuf.array();
//for (int i = 0; i < newbuf.limit(); i++) {
// byte by = data[i];
// System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by);
//}
inFrame = false;
readingSize = false;
size = 0;
sizePos = 0;
chain = 0;
bb.clear();
if (KokuKS.DEBUG_MODE) System.out.println("FIS: complete object");
//System.out.println("FIS: newbuf: " + new String(newbuf.array(), 0, newbuf.limit()));
}
}
}
}
}
and
/**
*
*/
package objtest;
import java.io.IOException;
import java.nio.ByteBuffer;
import koku.util.io.ByteBufferOutputStream;
/**
* UnrealConceptTest - FramedOutputStream
* @version 1.0
* @author Chris Dennett
*/
public class FramedOutputStream extends ByteBufferOutputStream {
public static final int FRAME_HEADER_1 = 0xBEEFFACE;
public static final int FRAME_HEADER_2 = 0xFACEBEEF;
public static final byte[] HEADER_BYTES = new byte[4 * 2];
public static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length];
/* We pad the beginning of our buffer so that we can write the frame
* length when the time comes. */
protected static final byte[] SIZE_PAD = new byte[4];
static {
ByteBuffer b = ByteBuffer.allocate(8);
b.putInt(FRAME_HEADER_1);
b.putInt(FRAME_HEADER_2);
ByteBuffer b2 = (ByteBuffer) b.flip();
b2.get(HEADER_BYTES, 0, 4);
b2.get(HEADER_BYTES, 3, 4);
}
/**
*
*/
public FramedOutputStream() {
try {
write(HEADER_BYTES);
write(SIZE_PAD);
} catch (IOException e) {
System.out.println("Couldn't write header padding!");
}
}
/* (non-Javadoc)
* @see koku.util.io.ByteBufferOutputStream#flip()
*/
@Override
public ByteBuffer flip() {
// flip the buffer which will limit it to it's current position
super.flip();
// then write the frame length and rewind back to the start of the
// buffer so that all the data is available
_buffer.position(11);
int size = _buffer.remaining();
//System.out.println("remaining after complete header: " + size);
_buffer.position(7);
//System.out.println("remaining after frameheader: " + _buffer.remaining());
putSizeAsBytes(size, _buffer);
//System.out.println("written size: " + size);
// System.out.println("buffer limit: " + _buffer.limit());
//System.out.println("_buffer: " + new String( _buffer.array(), 0, _buffer.limit()));
_buffer.position(11);
// System.out.println("_buffer11: " + ObjTest.getByteStr(_buffer.get()));
//System.out.println("_buffer12: " + ObjTest.getByteStr(_buffer.get()));
//System.out.println("_buffer13: " + ObjTest.getByteStr(_buffer.get()));
//System.out.println("_buffer14: " + ObjTest.getByteStr(_buffer.get()));
_buffer.rewind();
//_buffer.rewind();
//while (_buffer.hasRemaining()) {
// byte b = _buffer.get();
// System.out.println("b: " + (b > 32 ? new String(new byte[] {b}) : "??") + ", " + b);
//}
_buffer.rewind();
return _buffer;
}
/* (non-Javadoc)
* @see koku.util.io.ByteBufferOutputStream#reset()
*/
@Override
public void reset() {
super.reset();
try {
write(HEADER_BYTES);
write(SIZE_PAD);
} catch (IOException e) {
System.out.println("Couldn't write header padding!");
}
}
public static void putSizeAsBytes(int size, ByteBuffer bb) {
//System.out.println("putSizeAsBytes: given size: " + size);
// encode
for (int i = 0; i < 4; i++) {
bb.put((byte)((size >>> ((8 * 3) - (8 * i))) & 0xFF));
}
}
}