views:

65

answers:

5

So first here is some pseudo code of what I want to improve.

        public List<ProcessData> Processes;
        System.Threading.Thread ProcessThread;
        void ProcessLoop()
        {
            while (true)
            {
                for (int i = 0; i < Processes.Count; i++)
                {
                    if (HasPriority(Processes[i]))
                    {
                        Process(Processes[i]);
                    }
                }
                System.Threading.Thread.Sleep(1000);
            }

        }
        void AddProcessData(ProcessData pd)
        {
            Processes.Add(pd);
            if (Suspended(ProcessThread))
                Resume(ProcessThread);
        }
        void Startup()
        {
            ProcessThread = new System.Threading.Thread(ProcessLoop);
            ProcessThread.Start();
        }

So what I want to do is replace the 'Sleep' with code that will suspend the thread or have it wait until something is added to the list and then resume it. I also need it to be thread safe of course.

A: 

Get to know the RX framework from Microsoft. http://rxwiki.wikidot.com/start there you can get an event every x seconds and do something then. no need to start threads at all.

or use a timer, and make the check in the elapsed event.

cRichter
+1  A: 

What you want is an event.

EDIT: to make it thread-safe.

    public List<ProcessData> Processes;
    System.Threading.Thread ProcessThread;
    System.Threading.AutoResetEvent ProcessesChangedEvent = new System.Threading.AutoResetEvent(false);
    void ProcessLoop()
    {
        while (true)
        {
            // You might want to copy out the entire list as an array instead 
            // if HasPriority or Process take a long time.
            lock (Processes)
            {
                for (int i = 0; i < Processes.Count; i++)
                {
                    if (HasPriority(Processes[i]))
                    {
                        Process(Processes[i]);
                    }
                }
            }
            ProcessesChangedEvent.WaitOne(...); // timeout?
        }

    }
    void AddProcessData(ProcessData pd)
    {
        lock (Processes)
            Processes.Add(pd);
        ProcessesChangedEvent.Set(); // you can also use Monitor.PulseAll/Wait
    }
    void Startup()
    {
        ProcessThread = new System.Threading.Thread(ProcessLoop);
        ProcessThread.Start();
    }
wj32
How would I make it thread safe?
This code doesn't remove the items that are processed, and it also will not notice the newly added item until the next iteration of the worker. And it will block the AddProcessData call for the duration of Processing loop in a worker
ULysses
The AutoResetEvent is thread safe though, right?@Ulysses: This is pseudo-code. It doesn't matter as long as it gets the point across, which it did.
AutoResetEvent is not just thread safe, it ensures the thread safety ;)
ULysses
Woah, where do you get that from? AutoResetEvent neither ensures nor takes away thread safety.
wj32
the Process function may take a while, and as long as it is executed under the same lock as the `Processes.Add(pd)` in `AddProcessData` it will block the thread adding another item to process, which effectively makes the whole effort of asynchronous processing usless
ULysses
@wj32: it is a tool to make things thread safe, isn't it?
ULysses
How would you lock the Event to make it threadsafe too?@ULysses: Which is why he added that comment about copying it...
@high6: Instance methods on all `WaitHandle` classes are thread-safe which means you do not have to contain them within a lock. In fact, if you did (at least in the case of `WaitOne`) the `ProcessLoop` thread would deadlock. By the way, if you use this code make sure you use it **exactly** as written. The slightest deviation of the code especially by moving the calls to `Set` and `WaitOne` will livelock the `ProcessLoop` thread or otherwise make it break down in unpredictable ways. Of course, using a blocking queue is a more acceptable pattern here as it is not as brittle...see my answer.
Brian Gideon
@high6: I should also point out this approach effectively serializes the adding of new items and the processing of existing ones. I am not saying there is anything wrong with that. It is just something to be aware of. The blocking queue approach would not suffer the same problem. Of course, it would not be difficult to change the code here to resolve that bottleneck.
Brian Gideon
+1  A: 

In your case best desicion is to use one of next classes: ManualResetEvent or AutoResetEvent

zenonych
A: 

Just in case you really need to process the data only once, here is some code


        public Queue<ProcessData> Processes;
        System.Threading.Thread[] ProcessThreads = new System.Threading.Thread[5];
        System.Threading.Semaphore semToDo = new System.Threading.Semaphore(0,int.MaxValue);

        void ProcessLoop()
        {
            ProcessData pd; 

            while (true)
            {
                semToDo.WaitOne();
                lock (Processes)
                {
                    pd = Processes.Dequeue();                    
                }
                Process(pd);
            }
        }

        private void Process(ProcessData processData)
        {
            throw new NotImplementedException();
        }

        void AddProcessData(ProcessData pd)
        {
            lock (Processes)
            {
                Processes.Enqueue(pd);
            }
            semToDo.Release();
        }
        void Startup()
        {
            //you can even have multiple worker threads now!

            for(int i = 0; i < 5; i++)
                ProcessThreads[i] = new System.Threading.Thread(ProcessLoop);
            foreach (System.Threading.Thread t in ProcessThreads)
            {
                t.Start();
            }
        }
ULysses
@downrater: please explain yourself
ULysses
+1  A: 

What you really need is a BlockingQueue. You can find a quality implementation here. Or if you are using v4 of the .NET Framework then you can use the builtin BlockingCollection class. Notice how much easier the code is when using this type of data structure.

public class YourClass
{
  private BlockingQueue<ProcessData> m_Queue = new BlockingQueue<ProcessData();

  private void ProcessLoop()
  {
    while (true)
    {
      ProcessData item = m_Queue.Dequeue();
      if (HasPriority(item))
      {
        Process(item);
      }
    }
  }

  public void AddProcessData(ProcessData item)
  {
    m_Queue.Enqueue(item);
  }

  public void Startup()
  {
    var thread = new Thread(() => { ProcessLoop(); });
    thread.Start();
  }
}
Brian Gideon
BlockingQueue is a nice one, i didn't know it was added. In the implementation that you linked to I would add some locking on Enqueue and Dequeue as long as these methods in Queue are not thread safe
ULysses
@ULysses: The link I provided actually had several different implementations all of which correctly handle thread-safety issues already. No additional thread synchronization mechanisms are needed for the code in my answer as well.
Brian Gideon
@Brian Gideon: I mean this link you gave http://blogs.msdn.com/b/toub/archive/2006/04/12/blocking-queues.aspx
ULysses
@ULyssess: Yep, they already have the correct sychronization mechanisms in place. In fact, all of them even use the `lock` keyword in both the `Enqueue` and `Dequeue` methods.
Brian Gideon