views:

85

answers:

2

Basically i have multi threads that adds data into a queue via SQLite. I have another one thread that pulls them and process them one at a time (too much resource to do multiple at once). The processing thread does this:

  1. pull data from DB
  2. foreach { proccess }
  3. if count == 0 { thread.suspend() } (waken by thread.resume())
  4. repeat

my worker thread does:

  1. Validates data
  2. Inserts into DB
  3. call Queue.Poke(QueueName)

When I poke it, if the thread is suspended I .resume() it.

What I am worried about is if the process thread sees count==0, my worker inserts and pokes then my process continues down the if and sleeps. It won't realize there is something new in the DB.

How should I write this in such a way that I won't have a race condition.

A: 

I think this may be the structure you need.

private readonly Queue<object> _Queue = new Queue<object>();
private readonly object _Lock = new object();

void FillQueue()
{
    while (true)
    {
        var dbData = new { Found = true, Data = new object() };
        if (dbData.Found)
        {
            lock (_Lock)
            {
                _Queue.Enqueue(dbData.Data);
            }
        } 

        // If you have multiple threads filling the queue you
        // probably want to throttle it a little bit so the thread
        // processing the queue won't be throttled.
        // If 1ms is too long consider using 
        // TimeSpan.FromTicks(1000).

        Thread.Sleep(1);
    }       
}

void ProcessQueue()
{
    object data = null;

    while (true)
    {
        lock (_Lock)
        {
            data = _Queue.Count > 0 ? _Queue.Dequeue() : null;
        }

        if (data == null)
        {
            Thread.Sleep(1);
        }
        else
        {
            // Proccess
        }         
    }        
}
ChaosPandion
What he is doing here is using the Queue as the mechanism of notification instead of the Thread.resume() that the OP was considering. As long as there is "something in the queue", then the loop runs to check the DB.
Will Hartung
+1  A: 

Processing thread:

  1. event.Reset
  2. pull data from DB
  3. foreach { proccess }
  4. if count == 0 then event.Wait
  5. repeat

And the other thread:

  1. Validates data
  2. Inserts into DB
  3. event.Set()

You'll have extra wakes (wake on an empty queue, nothing to process, go back to sleep) but you won't have missed inserts.

Remus Rusanu
I dont understand event.Reset, what event.Wait may be (sounds like i am not using the thread class) and event.set hmm, if this wont race then event.wait wont block unless event is reset and clear... interesting. What class do i use?
acidzombie24
event is a ManualResetEvent instance both thread have a reference to. The key part is the order: processing first Reset, then dequeue and the worker first enqueue then Set. This ensures there is no window when the processing goes to sleep and misses a worker enqueued item.
Remus Rusanu
Excellent answer.
acidzombie24