views:

102

answers:

2

ObjectInputStream blocks when created until it recieves a serial input stream ans verifies it. I was trying to make my first program using sockets through it and found this. I used a dummy object so that it doesn't block. The code is here:

import java.io.*;                      
import java.net.*;                     
import java.util.*;                    

class Dummy implements Serializable {
}

class X_Int implements Serializable {
    int x;
}

class Server {
        public static void main(String args[]) throws Exception {
                ServerSocket ss = new ServerSocket(5879);
                Socket client = ss.accept();
                ObjectOutputStream out = new ObjectOutputStream(client.getOutputStream());
                out.writeObject(new Dummy());
                ObjectInputStream in = new ObjectInputStream(client.getInputStream());
                in.readObject();
                out.flush();
                out.writeObject(new Date());
                out.flush();
                out.close();
        }
}

class Client {
    public static void main(String args[]) throws Exception {
        Socket server = new Socket("localhost", 5879);
        ObjectOutputStream out = new ObjectOutputStream(server.getOutputStream());
        out.writeObject(new Dummy());
        ObjectInputStream in  = new ObjectInputStream(server.getInputStream());
        in.readObject();
        out.flush();
        Date d = (Date)in.readObject();
        System.out.println(d);
    }
}

Is this the right way. Please comment.

A: 

A better way is to get rid of the cause of blocking in the first place. Use these classes instead on both ends, if you can:

public class ObjInputStream extends ObjectInputStream {
    /**
     * @param in
     * @throws IOException
     */
    public ObjInputStream(InputStream in) throws IOException {
        super(in);
    }

    /* (non-Javadoc)
     * @see java.io.ObjectInputStream#readStreamHeader()
     */
    @Override
    protected void readStreamHeader() throws IOException, StreamCorruptedException {
    }
}

and

public class ObjOutputStream extends ObjectOutputStream {

    /**
     * @param out
     * @throws IOException
     */
    public ObjOutputStream(OutputStream out) throws IOException {
        super(out);
    }

    /* (non-Javadoc)
     * @see java.io.ObjectOutputStream#writeStreamHeader()
     */
    @Override
    protected void writeStreamHeader() throws IOException {
    }
}

This removes the functions which are called to ascertain stream version info and such.

Additionally, as you are using TCP packets, IP fragmentation will cause your objects not be received 'whole' on the other end -- TCP is a stream socket. What you need is an additional framing input / output class. Luckily, I already coded this :)

/**
 * 
 */
package objtest;

import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;

import kokuks.KokuKS;

/**
 * UnrealConceptTest - FramedInputStream
 * @version 1.0
 */
public class FramedInputStream extends InputStream {
    public static final int INITIAL_BUFFER_SIZE = 8 << 1;

    public static final int FRAME_HEADER_1 = 0xBEEFFACE;
    public static final int FRAME_HEADER_2 = 0xFACEBEEF;

    public static final byte[] HEADER_BYTES = new byte[4 * 2];
    protected static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length];

    static {
        ByteBuffer b = ByteBuffer.allocateDirect(8);

        b.putInt(FRAME_HEADER_1);
        b.putInt(FRAME_HEADER_2);

        ByteBuffer b2 = (ByteBuffer) b.flip();

        b2.get(HEADER_BYTES, 0, 4);
        b2.get(HEADER_BYTES, 3, 4);
    }   

    protected int     size         = 0;
    protected int     chain        = 0;
    protected boolean inFrame      = false;
    protected boolean readingSize  = false;
    protected int     sizePos      = 0;

    protected int dbgput = 0;


    protected ByteBuffer        bb  = ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE);
    protected Queue<ByteBuffer> bbq = new ArrayDeque<ByteBuffer>();
    protected ByteBuffer        currBuff = null;

    protected final boolean     recoverFromError;

    /**
     * 
     */
    public FramedInputStream(boolean recoverFromError) {
        this.recoverFromError = recoverFromError;
    }

    public FramedInputStream() {
        this(true);
    }

    protected boolean ensureFramebufferCapacity(int min) {
        int mymin = 1 << min;

        if (mymin <= bb.capacity()) return false;

        int num = bb.capacity();
        while (num < mymin) num <<= 1;

        ByteBuffer bb2 = ByteBuffer.allocateDirect(num);
        // copy old data into new bytebuffer
        int bb_pos = bb.position();
        bb.rewind();
        bb2.put(bb);
        bb = bb2;

        if (KokuKS.DEBUG_MODE) System.out.println("modified buffer size to: " + num);

        return true;
    }

    /**
     * @return the recoverFromError
     */
    public boolean isRecoverFromError() {
        return recoverFromError;
    }

    /* (non-Javadoc)
     * @see java.io.InputStream#read()
     */
    @Override
    public int read() throws IOException {
        if (currBuff == null || !currBuff.hasRemaining()) return -1;

        byte b = currBuff.get();
        //System.out.println("data: " + b);
        return b;
    }

    public void putBuffer(ByteBuffer source) {
        ensureFramebufferCapacity(bb.capacity() + source.remaining());

        while (source.hasRemaining()) {
            putByte(source.get());
        }
    }

    public boolean checkCompleteFrame() {
        return !bbq.isEmpty();
    }

    /* (non-Javadoc)
     * @see java.io.InputStream#available()
     */
    @Override
    public int available() throws IOException {
        return currBuff != null ? currBuff.remaining() : 0;
    }

    public int read(byte[] data) {
        if (currBuff == null || !currBuff.hasRemaining()) {
            return -1;
        }

        if (data.length > currBuff.remaining()) {
            throw new BufferUnderflowException();
        }

        currBuff.get(data);

        //System.out.println("data: " + new String(data));

        return data.length;
    }

    public boolean nextFrame() {
        ByteBuffer bbf = bbq.poll();

        if (bbf != null) {
            /*
            System.out.println("bbf limit: " + bbf.limit());
            System.out.println("bbf pos: " + bbf.position());
            System.out.println("bbf data: " + new String(bbf.array()));
            */

            //byte[] data = bbf.array();

            //for (int i = 0; i < data.length; i++) {
            //  byte by = data[i];
            //  System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by);
            //}         

            currBuff = ByteBuffer.allocateDirect(bbf.limit());
            currBuff.put(bbf).flip();
            bbf.rewind();

            /*
            System.out.println("currbuf limit: " + currBuff.limit());
            System.out.println("currbuf pos: " + currBuff.position());
            System.out.println("currbuf data: " + new String(currBuff.array()));
            */

            currBuff.rewind();
            currBuff.position(1);

            return true;
        }

        return false;
    }


    public void putByte(byte b) {
        //System.out.println("pb b: " + ObjTest.getByteStr(b));

        if (recoverFromError || !inFrame) {
            if (b == HEADER_BYTES[chain++]) {

                if (chain >= (HEADER_BYTES.length)) {
                    if (KokuKS.DEBUG_MODE) System.out.println("got header!" + (inFrame ? " (recovered)" : ""));

                    // we have a header! hurrah.
                    inFrame = true;
                    sizePos = 0;
                    size = 0;
                    readingSize = true;
                    chain = 0;

                    bb.clear();
                }
            } else {
                chain = 0;
            }
        }

        if (inFrame) {
            if (readingSize) {
                size += (b & 0xFF) << ((8 * 3) - (8 * sizePos));
                //System.out.println("new size: " + size);
                sizePos++;

                if (sizePos >= 4) {
                    // we've read the size :)
                    readingSize = false;
                    sizePos = 0;

                    ensureFramebufferCapacity(size);
                    bb.clear();
                    bb.limit(size); // set buffer limit to size
                    //System.out.println("bb limit set to: " + bb.limit());
                }
            } else {
                //System.out.println("put: " + dbgput++ + ", " + ObjTest.getByteStr(b));
                bb.put(b);

                if (!bb.hasRemaining()) {
                    bb.flip();

                    //System.out.println("bb limit after flip(): " + bb.limit());

                    //System.out.println("bblimit: " + bb.limit());

                    ByteBuffer newbuf = ByteBuffer.allocateDirect(bb.limit());
                    newbuf.put(bb).flip(); //we have to flip this
                    bbq.offer(newbuf); 

                    //byte[] data = newbuf.array();

                    //for (int i = 0; i < newbuf.limit(); i++) {
                    //  byte by = data[i];
                    //  System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by);
                    //}

                    inFrame = false;
                    readingSize = false;
                    size = 0;
                    sizePos = 0;
                    chain = 0;

                    bb.clear();

                    if (KokuKS.DEBUG_MODE) System.out.println("FIS: complete object");
                    //System.out.println("FIS: newbuf: " + new String(newbuf.array(), 0, newbuf.limit()));
                }
            }
        }
    }
}

and

/**
 * 
 */
package objtest;

import java.io.IOException;
import java.nio.ByteBuffer;

import koku.util.io.ByteBufferOutputStream;

/**
 * UnrealConceptTest - FramedOutputStream
 * @version 1.0
 * @author Chris Dennett
 */
public class FramedOutputStream extends ByteBufferOutputStream {
    public static final int FRAME_HEADER_1 = 0xBEEFFACE;
    public static final int FRAME_HEADER_2 = 0xFACEBEEF;

    public static final byte[] HEADER_BYTES = new byte[4 * 2];
    public static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length];

    /* We pad the beginning of our buffer so that we can write the frame
     * length when the time comes. */
    protected static final byte[] SIZE_PAD = new byte[4];

    static {
        ByteBuffer b = ByteBuffer.allocate(8);

        b.putInt(FRAME_HEADER_1);
        b.putInt(FRAME_HEADER_2);

        ByteBuffer b2 = (ByteBuffer) b.flip();

        b2.get(HEADER_BYTES, 0, 4);
        b2.get(HEADER_BYTES, 3, 4);
    }

    /**
     * 
     */
    public FramedOutputStream() {
        try {
            write(HEADER_BYTES);
            write(SIZE_PAD);
        } catch (IOException e) {
            System.out.println("Couldn't write header padding!");
        }
    }

    /* (non-Javadoc)
     * @see koku.util.io.ByteBufferOutputStream#flip()
     */
    @Override
    public ByteBuffer flip() {
        // flip the buffer which will limit it to it's current position
        super.flip();

        // then write the frame length and rewind back to the start of the
        // buffer so that all the data is available        
        _buffer.position(11);
        int size = _buffer.remaining();

        //System.out.println("remaining after complete header: " + size);

        _buffer.position(7);

        //System.out.println("remaining after frameheader: " + _buffer.remaining());

        putSizeAsBytes(size, _buffer);

        //System.out.println("written size: " + size);

       // System.out.println("buffer limit: " + _buffer.limit());

        //System.out.println("_buffer: " + new String( _buffer.array(), 0, _buffer.limit()));

        _buffer.position(11);

       // System.out.println("_buffer11: " + ObjTest.getByteStr(_buffer.get()));
        //System.out.println("_buffer12: " + ObjTest.getByteStr(_buffer.get()));
        //System.out.println("_buffer13: " + ObjTest.getByteStr(_buffer.get()));
        //System.out.println("_buffer14: " + ObjTest.getByteStr(_buffer.get()));

        _buffer.rewind();

        //_buffer.rewind();

        //while (_buffer.hasRemaining()) {
        //  byte b = _buffer.get();
        //  System.out.println("b: " + (b > 32 ? new String(new byte[] {b}) : "??") + ", " + b);
        //}

        _buffer.rewind();

        return _buffer;
    }

    /* (non-Javadoc)
     * @see koku.util.io.ByteBufferOutputStream#reset()
     */
    @Override
    public void reset() {
        super.reset();

        try {
            write(HEADER_BYTES);
            write(SIZE_PAD);
        } catch (IOException e) {
            System.out.println("Couldn't write header padding!");
        }
    }

    public static void putSizeAsBytes(int size, ByteBuffer bb) {
        //System.out.println("putSizeAsBytes: given size: " + size);

        // encode
        for (int i = 0; i < 4; i++) {
            bb.put((byte)((size >>> ((8 * 3) - (8 * i))) & 0xFF));
        }
    }
}
Chris Dennett
A: 

You just need to flush() the output before creating the object input stream. You don't need to send dummy objects.

Peter Lawrey