tags:

views:

113

answers:

1

I'm trying to write a buffermanager that manages 3 Streams. The typical usage would be with a slow producer and a fast consumer. The idea behind the three buffers is that the producer ALWAYS has a buffer to write in and the consumer ALWAYS gets the latest data produced.

Now i already have this, and it sort-off works.

namespace YariIfStream
{

    /// <summary>
    /// A class that manages three buffers used for IF data streams
    /// </summary>
    public class YariIFStream
    {
        private Stream writebuf; ///<value>The stream used for writing</value>
        private Stream readbuf; ///<value>The stream used for reading</value>
        private Stream swapbuf; ///<value>The stream used for swapping</value>
        private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
        private Object sync; ///<value>Object used for syncing</value>

        /// <summary>
        /// Initializes a new instance of the Yari.YariIFStream class with expandable buffers
        /// </summary>
        public YariIFStream()
        {
            sync = new Object();
            eerste = true;

            writebuf = new MemoryStream();
            readbuf = new MemoryStream();
            swapbuf = new MemoryStream();
        }

        /// <summary>
        /// Returns the stream with the buffer with new data ready to be read
        /// </summary>
        /// <returns>Stream</returns>
        public Stream GetReadBuffer()
        {
            lock (sync)
            {
                Monitor.Wait(sync);
                Stream tempbuf = swapbuf;
                swapbuf = readbuf;
                readbuf = tempbuf;
            }
            return readbuf;
        }

        /// <summary>
        /// Returns the stream with the buffer ready to be written with data
        /// </summary>
        /// <returns>Stream</returns>
        public Stream GetWriteBuffer()
        {
            lock (sync)
            {
                Stream tempbuf = swapbuf;
                swapbuf = writebuf;
                writebuf = tempbuf;
                if (!firsttime)
                {
                    Monitor.Pulse(sync);
                }
                else
                {
                    firsttime = false;

                }
            }
            //Thread.Sleep(1);
            return writebuf;
        }

    }
}

The firsttime check is used because the first time a writebuffer is asked, it can not pulse the consumer because the buffer still has to be written with data. When a writebuffer is asked a second time, we can be sure the previous buffer contains data.

I have two threads, one producer and one consumer. This is my output:

prod: uv_hjd`alv   cons: N/<]g[)8fV
prod: N/<]g[)8fV   cons: 5Ud*tJ-Qkv
prod: 5Ud*tJ-Qkv   cons: 4Lx&Z7qqjA
prod: 4Lx&Z7qqjA   cons: kjUuVyCa.B
prod: kjUuVyCa.B

Now it's ok the consumer lags one behind, it is supposed to do that. As you can see i lose my first string of data wich is my main problem.

The other problems are this:

  • if i remove the firsttime check, it works. But it shouldn't in my opinion...
  • if i add a Thread.Sleep(1); in the GetWriteBuffer() it also works. Something i don't understand.

Thanks in advance for any enlightenment.

A: 

I've fixed my problem. I replaced all the Stream instances with byte[]. Now it works fine. Don't know why Stream would not work, do not want to spent more time figuring this out.

Here's the new code for anyone running into the same problem.

/// <summary>
/// This namespace provides a crossthread-, concurrentproof buffer manager. 
/// </summary>
namespace YariIfStream
{

    /// <summary>
    /// A class that manages three buffers used for IF data streams
    /// </summary>
    public class YariIFStream
    {
        private byte[] writebuf; ///<value>The buffer used for writing</value>
        private byte[] readbuf; ///<value>The buffer used for reading</value>
        private byte[] swapbuf; ///<value>The buffer used for swapping</value>
        private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
        private Object sync; ///<value>Object used for syncing</value>

        /// <summary>
        /// Initializes a new instance of the Yari.YariIFStream class with expandable buffers with a initial capacity as specified
        /// </summary>
        /// <param name="capacity">Initial capacity of the buffers</param>
        public YariIFStream(int capacity)
        {
            sync = new Object();
            firsttime = true;

            writebuf = new byte[capacity];
            readbuf = new byte[capacity];
            swapbuf = new byte[capacity];
        }

        /// <summary>
        /// Returns the buffer with new data ready to be read
        /// </summary>
        /// <returns>byte[]</returns>
        public byte[] GetReadBuffer()
        {
            byte[] tempbuf;
            lock (sync)
            {
                Monitor.Wait(sync);
                tempbuf = swapbuf;
                swapbuf = readbuf;
            }
            readbuf = tempbuf;

            return readbuf;
        }

        /// <summary>
        /// Returns the buffer ready to be written with data
        /// </summary>
        /// <returns>byte[]</returns>
        public byte[] GetWriteBuffer()
        {
            byte[] tempbuf;
            lock (sync)
            {
                tempbuf = swapbuf;
                swapbuf = writebuf;

                writebuf = tempbuf;

                if (!firsttime)
                {
                    Monitor.Pulse(sync);
                }
                else
                {
                    firsttime = false;
                }
            }
            return writebuf;
        }
    }
}
Xtreme_Machine