views:

177

answers:

4

Hi,

we are working on a program where we need to flush (force compress and send data) a GZIPOutputStream. The problem is, that the flush method of the GZIPOutputStream doesn't work as expected (force compress and send data), instead the Stream waits for more data for efficient data compression.

When you call finish the data is compressed and sent over the output stream but the GZIPOutputStream (not the underlying stream) will be closed so we cant write more data till we create a new GZIPOutputStream, which costs time and performance.

Hope anyone can help with this.

Best regards.

A: 

Probably something like this will do what you need (I did not test it):

OutputStream notToBeClosed = ...
OutputStream guard = new FilterOutputStream(notToBeClosed) {
    public void close() { flush(); /*  don't close this.out */ }
};
GZIPOutputStream zip = new GZIPOutputStream(guard);

// will call guard.close(), but that won't propagate to notToBeClosed.close()
zip.close();
giorgiga
As I said, the underlying stream is not the problem but the GZIPOutputStream. I want to force-flush the GZIPOutputStream without closing it.
Hemeroc
Sorry Hemeroc - couldn't you just close/discard that GZIPOutputStream and create a new one over the same OutputStream used for the first one?You want to do as writing 2 gzip files in sequence on the same OutputStream - am I getting this right?
giorgiga
A: 

I haven't tried this yet, and this advice won't be useful until we have Java 7 in hand, but the documentation for GZIPOutputStream's flush() method inherited from DeflaterOutputStream relies upon the flush mode specified at construction time with the syncFlush argument (related to Deflater#SYNC_FLUSH) to decide whether to flush the pending data to be compressed. This syncFlush argument is also accepted by GZIPOutputStream at construction time.

It sounds like you want to use either Deflator#SYNC_FLUSH or maybe even Deflater#FULL_FLUSH, but, before digging down that far, first try working with the two-argument or the four-argument GZIPOutputStream constructor and pass true for the syncFlush argument. That will activate the flushing behavior you desire.

seh
Hi, your answer is great if you are working with Java7 which is not released at the moment. I'm working with java6 (as most of the users do).
Hemeroc
Oh, I'm sorry about that. You're right: these signatures are not yet available in Java 6. That serves me right for reading "the latest" documentation. We'll have to wait for these to arrive.
seh
A: 

Bug ID 4813885 handles this issue. The comment of "DamonHD", submitted on 9 Sep 2006 (about halfway the bugreport) contains an example of FlushableGZIPOutputStream which he built on top of Jazzlib's net.sf.jazzlib.DeflaterOutputStream.

For reference, here's a (reformatted) extract:

/**
 * Substitute for GZIPOutputStream that maximises compression and has a usable
 * flush(). This is also more careful about its output writes for efficiency,
 * and indeed buffers them to minimise the number of write()s downstream which
 * is especially useful where each write() has a cost such as an OS call, a disc
 * write, or a network packet.
 */
public class FlushableGZIPOutputStream extends net.sf.jazzlib.DeflaterOutputStream {
    private final CRC32 crc = new CRC32();
    private final static int GZIP_MAGIC = 0x8b1f;
    private final OutputStream os;

    /** Set when input has arrived and not yet been compressed and flushed downstream. */
    private boolean somethingWritten;

    public FlushableGZIPOutputStream(final OutputStream os) throws IOException {
        this(os, 8192);
    }

    public FlushableGZIPOutputStream(final OutputStream os, final int bufsize) throws IOException {
        super(new FilterOutputStream(new BufferedOutputStream(os, bufsize)) {
            /** Suppress inappropriate/inefficient flush()es by DeflaterOutputStream. */
            @Override
            public void flush() {
            }
        }, new net.sf.jazzlib.Deflater(net.sf.jazzlib.Deflater.BEST_COMPRESSION, true));
        this.os = os;
        writeHeader();
        crc.reset();
    }

    public synchronized void write(byte[] buf, int off, int len) throws IOException {
        somethingWritten = true;
        super.write(buf, off, len);
        crc.update(buf, off, len);
    }

    /**
     * Flush any accumulated input downstream in compressed form. We overcome
     * some bugs/misfeatures here so that:
     * <ul>
     * <li>We won't allow the GZIP header to be flushed on its own without real compressed
     * data in the same write downstream. 
     * <li>We ensure that any accumulated uncompressed data really is forced through the 
     * compressor.
     * <li>We prevent spurious empty compressed blocks being produced from successive 
     * flush()es with no intervening new data.
     * </ul>
     */
    @Override
    public synchronized void flush() throws IOException {
        if (!somethingWritten) { return; }

        // We call this to get def.flush() called,
        // but suppress the (usually premature) out.flush() called internally.
        super.flush();

        // Since super.flush() seems to fail to reliably force output, 
        // possibly due to over-cautious def.needsInput() guard following def.flush(),
        // we try to force the issue here by bypassing the guard.
        int len;
        while((len = def.deflate(buf, 0, buf.length)) > 0) {
            out.write(buf, 0, len);
        }

        // Really flush the stream below us...
        os.flush();

        // Further flush()es ignored until more input data data written.
        somethingWritten = false;
    }

    public synchronized void close() throws IOException {
        if (!def.finished()) {
            def.finish();
            do {
                int len = def.deflate(buf, 0, buf.length);
                if (len <= 0) { 
                    break;
                }
                out.write(buf, 0, len);
            } while (!def.finished());
        }

        // Write trailer
        out.write(generateTrailer());

        out.close();
    }

    // ...
}

You may find it useful.

BalusC
Hi, I expect the same problems as nardian, any suggestions?
Hemeroc
A: 

BalusC - i already know that bug-report (i have the same problem as Hemeoc) but, please correct me if i did something wrong, but this test-examlpe just dont work as i want to..

protected static final int  port                = 12348;
private static boolean      serversocketOpen    = false;

public static void main(String[] args) {
    Thread server = new Thread(new Runnable() {

        @Override
        public void run() {
            System.out.println("server start");
            try {
                ServerSocket ss = new ServerSocket(FlushableGZIPTest.port);
                FlushableGZIPTest.serversocketOpen = true;
                Socket s = ss.accept();

                InputStream is = s.getInputStream();
                // ZInputStream zis = new ZInputStream(is);
                GZIPInputStream gis = new GZIPInputStream(is);
                // MyGZIPInputStream gis = new MyGZIPInputStream(is);

                byte[] test = new byte[1000];
                while (true) {
                    // Thread.sleep(900);
                    // System.out.println("available: " + gis.available());
                    System.out.println("server - reading         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
                    int read = gis.read(test);
                    System.out.println("server - " + read);
                }
            } catch (IOException e) {
                e.printStackTrace();
                // } catch (InterruptedException e) {
                // e.printStackTrace();
            }

            System.out.println("server end");
        }
    });

    Thread client = new Thread(new Runnable() {

        @Override
        public void run() {
            System.out.println("client start");
            try {
                while (!FlushableGZIPTest.serversocketOpen) {
                    Thread.sleep(1);
                }

                Socket s = new Socket("127.0.0.1", FlushableGZIPTest.port);

                OutputStream os = s.getOutputStream();
                // GZIPOutputStream gos = new GZIPOutputStream(os);
                // MyGZIPOutputStream gos = new MyGZIPOutputStream(os);
                FlushableGZIPOutputStream gos = new FlushableGZIPOutputStream(os);

                Random r = new Random();
                byte[] test = new byte[10];
                test = "Haiii".getBytes();
                while (true) {
                    // r.nextBytes(test);
                    System.out.println("client - writing ~~~~~~~~~");
                    gos.write(test);
                    Thread.sleep(500);
                    System.out.println("client - flushing");
                    gos.flush();
                    /*
                     * after this, "server - Haiii" should be printed.. (in the server-thread)
                     */
                    Thread.sleep(1000);
                }

            } catch (UnknownHostException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("client end");
        }
    });

    server.start();
    client.start();

}

the FlushableGZIPOutputStream is the one you showed here, just created the writeHeader() function (copied it from the original java.util.zip.GZIPOutputStream).
but as you can see - my example just dont want to work.

all GZIPStreams are from the jazzlib implementation (even the GZIPInputStream in the server-thread)

and please forgive me, if this was the wrong way for asking futher for help.

Nardian