views:

133

answers:

6

Hello,

I am interested to get some ideas from you about what would be a good/better threading architecture that respects the rules described below:

  • A thread must be running for the life of the application, in the sleep/wait mode if there is no work in the queue to be performed.

  • A thread must be of a BelowNormal priority (this eliminates possibility of using ThreadPool).

  • The thread must give its feedback to the main thread upon completion of task.

  • Thread will monitor a Queue<T> to get more jobs to be performed.

I am using .Net Framework 4.0

Let me know what you think :)

+1  A: 

By reading above conditions

Some questions

1- Is there any other thread which will populate jobs in Queue< T > ?

if the answer is yes than Producer / Consumer Deign Pattern can be used here i am not aware of .net 4.0 but this design can be implemented in .net 3.5.

See here for example.

saurabh
Yes Queue will be populated with another thread
Ruby
It is important to note the article linked to here regarding the producer/consumer pattern is incorrect. It contains a very subtle, but nasty bug that can cause it to get live-locked. Microsoft really needs to banish this article.
Brian Gideon
A: 

A thread pool sounds like just the thing. Actually, you can change the priority of .NET's own thread pool by setting the process priority. Bump the process priority down a notch and your UI up a notch and you should have a UI at normal priority and thread pool at lower priority.

Ben Voigt
I wouldn't recomment changing the process priority. Also if you use BELOW_NORMAL_PRIORITY_CLASS for the process, you'd have to use THREAD_PRIORITY_HIGHEST for the GUI thread to get the same priority that a NORMAL_PRIORITY_CLASS/THREAD_PRIORITY_NORMAL thread has. See http://msdn.microsoft.com/en-us/library/ms685100(VS.85).aspx
pgroke
+3  A: 

When I need to implement my own multi-threaded processing, I usually use something like this:

public class MyWorker<T> : IDisposable
{
    private readonly Queue<T> _taskQueue; // task queue
    private readonly object _threadLock = new object();
    private Thread _thread; // worker thread
    private ManualResetEvent _evExit;
    private AutoResetEvent _evNewData;

    /// <summary>Override this to process data.</summary>
    protected abstract void ProcessData(T data);

    /// <summary>Override this to set other thread priority.</summary>
    protected virtual ThreadPriority ThreadPriority
    {
        get { return ThreadPriority.BelowNormal; }
    }

    protected MyWorker()
    {
        _taskQueue = new Queue<T>();
        _evExit = new ManualResetEvent(false);
        _evNewData = new AutoResetEvent(false);
    }

    ~MyWorker()
    {
        Dispose(false);
    }

    private void ThreadProc()
    {
        try
        {
            var wh = new WaitHandle[] { _evExit, _evNewData };
            while(true)
            {
                T data = default(T);
                bool gotData = false;
                lock(_taskQueue) // sync
                {
                    if(_taskQueue.Count != 0) // have data?
                    {
                        data = _taskQueue.Dequeue();
                        gotData = true;
                    }
                }
                if(!gotData)
                {
                    if(WaitHandle.WaitAny(wh) == 0) return; // demanded stop
                    continue; //we have data now, grab it
                }
                ProcessData(data);
                if(_evExit.WaitOne(0)) return;
            }
        }
        catch(ThreadInterruptedException)
        {
            // log warning - this is not normal
        }
        catch(ThreadAbortException)
        {
            // log warning - this is not normal
        }
    }

    public void Start()
    {
        lock(_threadLock)
        {
            if(_thread != null)
                throw new InvalidOperationException("Already running.");
            _thread = new Thread(ThreadProc)
            {
                Name = "Worker Thread",
                IsBackground = true,
                Priority = ThreadPriority,
            };
            _thread.Start();
        }
    }

    public void Stop()
    {
        lock(_threadLock)
        {
            if(_thread == null)
                throw new InvalidOperationException("Is not running.");
            _evExit.Set();
            if(!_thread.Join(1000))
                _thread.Abort();
            _thread = null;
        }
    }

    /// <summary>Enqueue data for processing.</summary>
    public void EnqueueData(T data)
    {
        lock(_taskQueue)
        {
            _taskQueue.Enqueue(data);
            _evNewData.Set(); // wake thread if it is sleeping
        }
    }

    /// <summary>Clear all pending data processing requests.</summary>
    public void ClearData()
    {
        lock(_taskQueue)
        {
            _taskQueue.Clear();
            _evNewData.Reset();
        }
    }

    protected virtual void Dispose(bool disposing)
    {
        lock(_threadLock)
        {
            if(_thread != null)
            {
                _evExit.Set();
                if(!_thread.Join(1000))
                    _thread.Abort();
                _thread = null;
            }
        }
        _evExit.Close();
        _evNewData.Close();
        if(disposing)
            _taskQueue.Clear();
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
}
max
+3  A: 
  • A thread must be of a BelowNormal priority (this eliminates possibility of using ThreadPool).

This seems to be the main stumbling block for using the TPL and ThreadPool. Are you sure you're not over-estimating the usefulness of a lower priority?

You will have to put in a lot of work to come up with something that will always be much less powerful (and much less tested/reliable) than the TPL.

I would reconsider this.

Henk Holterman
+1  A: 

Personally I roll my own usually, because I like having much tighter control.

I use this in Media Browser:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using MediaBrowser.Library.Logging;

namespace MediaBrowser.Library.Threading {

    public static class Async {

        public const string STARTUP_QUEUE = "Startup Queue";

        class ThreadPool {
            List<Action> actions = new List<Action>();
            List<Thread> threads = new List<Thread>();
            string name;
            volatile int maxThreads = 1;

            public ThreadPool(string name) {
                Debug.Assert(name != null);
                if (name == null) {
                    throw new ArgumentException("name should not be null");
                }
                this.name = name;
            }


            public void SetMaxThreads(int maxThreads) {
                Debug.Assert(maxThreads > 0);
                if (maxThreads < 1) {
                    throw new ArgumentException("maxThreads should be larger than 0");
                }

                this.maxThreads = maxThreads;
            }

            public void Queue(Action action, bool urgent) {
                Queue(action, urgent, 0);
            }

            public void Queue(Action action, bool urgent, int delay) {

                if (delay > 0) {
                    Timer t = null;
                    t = new Timer(_ =>
                    {
                        Queue(action, urgent, 0);
                        t.Dispose();
                    }, null, delay, Timeout.Infinite);
                    return;
                }

                lock (threads) {
                    // we are spinning up too many threads
                    // should be fixed 
                    if (maxThreads > threads.Count) {
                        Thread t = new Thread(new ThreadStart(ThreadProc));
                        t.IsBackground = true;
                        // dont affect the UI.
                        t.Priority = ThreadPriority.Lowest;
                        t.Name = "Worker thread for " + name;
                        t.Start();
                        threads.Add(t);
                    }
                }

                lock (actions) {
                    if (urgent) {
                        actions.Insert(0, action);
                    } else {
                        actions.Add(action);
                    }

                    Monitor.Pulse(actions);
                }
            }

            private void ThreadProc() {

                while (true) {

                    lock (threads) {
                        if (maxThreads < threads.Count) {
                            threads.Remove(Thread.CurrentThread);
                            break;
                        }
                    }

                    List<Action> copy;

                    lock (actions) {
                        while (actions.Count == 0) {
                            Monitor.Wait(actions);
                        }
                        copy = new List<Action>(actions);
                        actions.Clear();
                    }

                    foreach (var action in copy) {
                        action();
                    }
                }
            }
        }


        static Dictionary<string, ThreadPool> threadPool = new Dictionary<string, ThreadPool>();

        public static Timer Every(int milliseconds, Action action) {
            Timer timer = new Timer(_ => action(), null, 0, milliseconds);
            return timer;
        }

        public static void SetMaxThreads(string uniqueId, int threads) {
            GetThreadPool(uniqueId).SetMaxThreads(threads);
        }

        public static void Queue(string uniqueId, Action action) {
            Queue(uniqueId, action, null);
        }

        public static void Queue(string uniqueId, Action action, int delay) {
            Queue(uniqueId, action, null,false, delay);
        }

        public static void Queue(string uniqueId, Action action, Action done) {
            Queue(uniqueId, action, done, false);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent) {
            Queue(uniqueId, action, done, urgent, 0);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent, int delay) {

            Debug.Assert(uniqueId != null);
            Debug.Assert(action != null);

            Action workItem = () =>
            {
                try {
                    action();
                } catch (ThreadAbortException) { /* dont report on this, its normal */ } catch (Exception ex) {
                    Debug.Assert(false, "Async thread crashed! This must be fixed. " + ex.ToString());
                    Logger.ReportException("Async thread crashed! This must be fixed. ", ex);
                }
                if (done != null) done();
            };

            GetThreadPool(uniqueId).Queue(workItem, urgent, delay);
        }

        private static ThreadPool GetThreadPool(string uniqueId) {
            ThreadPool currentPool;
            lock (threadPool) {
                if (!threadPool.TryGetValue(uniqueId, out currentPool)) {
                    currentPool = new ThreadPool(uniqueId);
                    threadPool[uniqueId] = currentPool;
                }
            }
            return currentPool;
        }
    }

}

It has a fairly elegant API, only feature I would like to add one day is scavenging empty thread pools.

Usage:

 // Set the threads for custom thread pool 
 Async.SetMaxThreads("Queue Name", 10); 
 // Perform an action on the custom threadpool named: "Queue Name", when done call ImDone  
 Async.Queue("Queue Name", () => DoSomeThing(foo), () => ImDone(foo)); 

This has a few handy oveloads that allow you to queue delayed actions, and another to push in urgent jobs that skip to the front of the queue.

Sam Saffron
+1  A: 

This situation screams BlockingCollection loud and clear. Create a dedicated thread that watches the queue with its priority set approriately. The BlockingCollection.Take method will block automatically when there are no items in the queue.

public class Example
{
  private BlockingCollection<WorkItem> m_Queue = new BlockingCollection<WorkItem>();

  public event EventHandler<WorkItemEventArgs> WorkItemCompleted;

  public Example()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          WorkItem item = m_Queue.Take();
          // Add code to process the work item here.
          if (WorkItemCompleted != null)
          {
             WorkItemCompleted(this, new WorkItemEventArgs(item));
          }
        }
      });
    thread.IsBackground = true;
    thread.Priority = ThreadPriority.BelowNormal;
    thread.Start();
  }

  public void Add(WorkItem item)
  {
    m_Queue.Add(item);
  }

}
Brian Gideon