views:

570

answers:

2

I am completely perplexed with how my Queue is function. I am attempting (and failing) to write a small multi-threaded application to collect and display data in C#.
After reading through Albahari's book and using the Consumer/Producer pattern he describes I got most of it to work except my data seems to get scrambled in the queue. Before getting queued the fields in my object have the following values

timeStamp = 6
data[] ={4936, 9845, 24125, 44861}

After being dequeued the data looks like

timeStamp = 6
data[] = {64791, 19466, 47772, 65405}

I don't understand why the values in the data filed are being changed after the dequeue? I am perplexed so I thought I'd throw it out there to see if anyone can point me in the right direction to fixing this or to point me in a different direction to proceed.


Relevant Code


Custom Object for data storage

Relevant objects and Fields. The class sensorData is a seperate class use to store my calculations.

public class sensorData
{
    public const int bufSize = 4;
    public UInt16[] data = new UInt16[4];
    public double TimeStamp = 0; 
    public int timeIndex = 0;
}

The following fields are used to setup the queue and signals between the enqueue and dequeue threads.

EventWaitHandle wh = new AutoResetEvent(false);
Queue<sensorData> dataQ = new Queue<sensorData>();
object locker = new object();

Enqueue Method/Thread

This is my worker thread it calculates four Sine curves and queues the result for processing. I also write the results to a file so I know what it has calculated.

private void calculateAndEnqueueData(BackgroundWorker worker, DoWorkEventArgs e)
{
    int j = 0;
    double time = 0;
    double dist;
    UInt16[] intDist = new UInt16[sensorData.bufSize];
    UInt16 uint16Dist;

    // Frequencies of the four Sine curves
    double[] myFrequency = { 1, 2, 5, 10 };

    // Creates the output file.
    StreamWriter sw2 = File.CreateText("c:\\tmp\\QueuedDataTest.txt"); 

    // Main loop to calculate my Sine curves
    while (!worker.CancellationPending)
    {
        // Calculate four Sine curves
        for (int i = 0; i < collectedData.numberOfChannels; i++)
        {
            dist = Math.Abs(Math.Sin(2.0 * Math.PI * myFrequency[i] * time);
            uint16Dist = (UInt16)dist;
            intDist[i] = uint16Dist;
        }

        // Bundle the results and Enqueue them
        sensorData dat = new sensorData();
        dat.data = intDist;
        dat.TimeStamp = time;
        dat.timeIndex = j;

        lock (locker) dataQ.Enqueue(dat);
        wh.Set

        // Output results to file.
        sw2.Write(j.ToString() + ", ");
        foreach (UInt16 dd in dat.intData)
        {
            sw2.Write(dd.ToString() + ", ");
        }
        sw2.WriteLine();

        // Increments time and index.
        j++;
        time += 1 / collectedData.signalFrequency;

        Thread.Sleep(2);
    }
    // Clean up
    sw2.Close();
    lock (locker) dataQ.Enqueue(null);
    wh.Set();
    sw2.Close();
}

Example line in the output file QueuedDataTest.txt

6, 4936, 9845, 24125, 44861,

Dequeue Data Method

This Method dequeues elements from the queue and writes them to a file. Until a null element is found on the queue at which point the job is done.

    private void dequeueDataMethod()
    {
        StreamWriter sw = File.CreateText("C:\\tmp\\DequeueDataTest.txt");

        while (true)
        {
            sensorData data = null;

            // Dequeue available element if any are there.
            lock (locker)
                if (dataQ.Count > 0)
                {
                    data = dataQ.Dequeue();
                    if (data == null)
                    {
                        sw.Close();
                        return;
                    }
                }

            // Check to see if an element was dequeued. If it was write it to file.
            if (data != null)
            {
                sw.Write(data.timeIndex.ToString() + ", ");
                foreach (UInt16 dd in data.data)
                    sw.Write(dd.ToString() + ", ");
                sw.WriteLine();
            }
            else
            {
                wh.WaitOne();
            }
        }

Output result after dequeueing the data and writing it to DequeueDataTest.txt

6, 64791, 19466, 47772, 65405,


Updates 1:

Location of Locks in current code.


I have edited the code to place locks around the writing data to the file. So the code blocks I have locks around are as follows.

In the CalculateAndEnqueueData() method I have

lock (locker) dataQ.Enqueue(dat);
wh.Set

lock(locker)
{
  sw2.Write(j.ToString() + ", ");
  foreach (UInt16 dd in dat.intData)
  {
     sw2.Write(dd.ToString() + ", ");
  }
  sw2.WriteLine();
}

In the dequeueDataMethod() I have two areas with locks the first is here

lock(locker) 
    if (dataQ.Count > 0)
    {
       data = dataQ.Dequeue();
       if (data == null)
       {
           sw.Close();
           return;
        }
    }

which I assume locks locker for the code in the if block. The second is where I write the to the file here

lock (locker)
{
    sw.Write(data.timeIndex.ToString() + ", ");
    foreach (UInt16 dd in data.intData)
        sw.Write(dd.ToString() + ", ");
    sw.WriteLine();
    if (icnt > 10)
    {
        sw.Close();
        return;
    }
    this.label1.Text = dataQ.Count.ToString();
}

That's all of them.


+9  A: 

The problem is due to no synchronisation on the StreamWriter you are writing to. The order is not sequential.

Mitch Wheat
Yes, I need to place a lock around the code that writes to the file. Thanks. That is exactly what I needed!
Azim
I thought you said each thread is writing to a separate file? I don't think synchronization of the StreamWriter, which is defined locally, is the problem here. Can you verify this?
Zach Scrivena
@Zach Scrivena: yes each thread writes to a different file (test.txt and test2.txt). All I know is that locking locker around that code block seems to help. I don't know why.
Azim
It only seems to work because intDist is being written and read "as a block"... however I think this is still not correct. To see this, put a 2-second delay in calculateAndEnqueueData() and a 4-second delay in dequeueDataMethod() and see what happens.
Zach Scrivena
@Zach Scrivena: ok, added the delays as you suggested. the code seems to work. I let it run for about 1 minute and it behaves fine. Both files are identical.
Azim
What if you remove the delay totally in calculateAndEnqueueData()? Btw, the delay in dequeueDataMethod() is outside the lock right?
Zach Scrivena
@Zach Scrivena: Yes, my Thread.Sleep's are outside the lock. Anway, I removed the Thread.Sleep from calcaulteAndEnqueueData(). It still works?!
Azim
Hmm... that's puzzling... let's continue our discussion at my answer, to prevent polluting Mitch's answer. (Sorry!)
Zach Scrivena
+1  A: 

Is it because you are writing to the same array UInt16[] intDist over and over again? Shouldn't you be using separate arrays for each sensorData object? (Btw, is sensorData.intData suppose to be sensorData.data in your sample code?)

CLARIFICATION:

Only one intDist array is created in calculateAndEnqueueData(), so different sensorData instances are all sharing the same array --- this is ok if the adding+writing and removal+writing occur in lock-step; otherwise, some data points may be missing/repeated.

SUGGESTION:

Populate sensorData instances directly, without using the intDist array, in calculateAndEnqueueData():

    // create new sensorData instance
    sensorData dat = new sensorData();
    dat.TimeStamp = time;
    dat.timeIndex = j;

    // Calculate four Sine curves
    for (int i = 0; i < collectedData.numberOfChannels; i++)
    {
        dat.data[i] = (UInt16) Math.Abs(Math.Sin(2.0 * Math.PI * myFrequency[i] * time);
    }

    // enqueue
    lock (locker) dataQ.Enqueue(dat);
Zach Scrivena
yes, thank you. Typo from cutting my code down. Originally, sensorData had a few more fields that are used which I removed for the question.
Azim
No I don't think so because the producer thread, calculateAndEnqueueData, which writes to the UInt16[] produces the correct data. The data is scrambled when it is dequeued in the consumer thread, dequeueDataMethod
Azim
That's because they are being written "as a block"... when dequeuing, the same array is being modified (though for a different sensorData object) by calculateAndEnqueueData, thus the wrong results.
Zach Scrivena
Seems like you code is writing and reading sensorData objects in lock-step --- is that the intention? (Check if the queue length ever exceeds one... I suspect it doesn't.)
Zach Scrivena
Yes the queue length will grow if I slow down the dequeue. So that queue length can grow greater than one.
Azim
posted a followup question to continue.
Azim
Are the numbers still correct when the queue grows?
Zach Scrivena
yes they are still correct.
Azim
Just to clarify, where are the locks in your code now?
Zach Scrivena
Will edit question to clarify.
Azim
Thanks for clarifying. Quick question: The code only writes to either one file at any given time, so what is the reason behind multithreading? In other words, what parts of the code are you trying to run in parallel?
Zach Scrivena
Well, I have Methods for reading data coming in from a Serial port, Converting this data and displaying in a graph. I thought it would be good if I could collect the data on a separate thread from the processing and display thread.
Azim
I see. So calculateAndEnqueueData() = serial port thread, and dequeueDataMethod() = processing/display thread?
Zach Scrivena
yup. That would be the end goal.
Azim
Ok, I've updated my answer to explain my concern.
Zach Scrivena
Btw, I also notice that in class sensorData, you've created the array data which is never used since it is reassigned in calculateAndEnqueueData().
Zach Scrivena
Thanks for your input. If I change dat.data = intDist to dat.data = (UInt16[]) intDist.Clone(); I don't need the sleeps and locks. Can you include this as the solution in your answer?
Azim
Ok, I've updated my answer with a suggestion and some sample code. Hope it helps =)
Zach Scrivena
Thanks. Your input was extremely helpful in debugging this. The suggestion makes sense and the code works well now.
Azim