views:

596

answers:

4

I've created a custom thread pool utility, but there seems to be a problem that I cannot find.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
    private readonly Queue queue = Queue.Synchronized(new Queue());
    private readonly AutoResetEvent res = new AutoResetEvent(true);
    private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
    private readonly Semaphore sem = new Semaphore(1, 4);
    private readonly Thread thread;
    private Action<T> DoWork;
    private int Num_Of_Threads;

    private QueueManager()
    {
        Num_Of_Threads = 0;
        maxThread = 5;
        thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
        thread.Start();

        //   log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
    }

    public int maxThread { get; set; }

    public static FileUploadQueueManager<T> Instance
    {
        get { return Nested.instance; }
    }

    /// <summary>
    /// Executes multythreaded operation under items
    /// </summary>
    /// <param name="list">List of items to proceed</param>
    /// <param name="action">Action under item</param>
    /// <param name="MaxThreads">Maximum threads</param>
    public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        maxThread = MaxThreads;
        DoWork = action;
        foreach (T item in list)
        {
            Add(item);
        }
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
    {
        ExecuteNoThread(list, action, 0);
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        foreach (T wallpaper in list)
        {
            action(wallpaper);
        }
    }
    /// <summary>
    /// Default 10 threads
    /// </summary>
    /// <param name="list"></param>
    /// <param name="action"></param>
    public void Execute(IEnumerable<T> list, Action<T> action)
    {
        Execute(list, action, 10);
    }

    private void Add(T item)
    {
        lock (queue)
        {
            queue.Enqueue(item);
        }
        res.Set();
    }

    private void Worker()
    {
        while (true)
        {
            if (queue.Count == 0)
            {
                res.WaitOne();
            }

            if (Num_Of_Threads < maxThread)
            {
                var t = new Thread(Proceed);
                t.Start();
            }
            else
            {
                res_thr.WaitOne();
            }
        }
    }

    private void Proceed()
    {
        Interlocked.Increment(ref Num_Of_Threads);
        if (queue.Count > 0)
        {
            var item = (T) queue.Dequeue();

            sem.WaitOne();
            ProceedItem(item);
            sem.Release();
        }
        res_thr.Set();
        Interlocked.Decrement(ref Num_Of_Threads);
    }

    private void ProceedItem(T activity)
    {
        if (DoWork != null)
            DoWork(activity);

        lock (Instance)
        {
            Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                          thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                          Num_Of_Threads);
        }
    }

    #region Nested type: Nested

    protected class Nested
    {
        // Explicit static constructor to tell C# compiler
        // not to mark type as beforefieldinit
        internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
    }

    #endregion

}

}

Problem is here:

Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                      thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                      Num_Of_Threads);

There is always ONE thread id in title. And program seems to be working in one thread.

Sample usage:

        var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
        QueueManager<int>.Instance.Execute(i_list,
          i =>
          {
              Console.WriteLine("Some action under element number {0}", i);

          }, 5);

P.S.: it's pretty messy, but I'm still working on it.

+1  A: 

You should probally use the built in thread pool. When running your code I noticed that your spining up a bunch of threads but since the queue count is <1 you just exit, this continues until the queue is actually populated then your next thread processes everything. This is a very expensive process. You should only spin up threads if you have something to do.

JoshBerke
The built-in thread pool would be great if it were not a static class and you could make multiple thread pool objects.
Justice
+5  A: 

I looked through your code and here are a couple of issues I saw.

  1. You lock the queue object even though it is synchronized queue. This is unnecessary
  2. You inconsistently lock the queue object. It should either be locked for every access or not locked and depending on the Synchronized behavior.
  3. The Proceed method is not thread safe. These two lines are the issue

        if (queue.Count > 0) {
          var item = (T)queue.Dequeue();
        ...
        }
    

    Using a synchronized queue only guarantees that individual accesses are safe. So both the .Count and the .Dequeue method won't mess with te internal structure of the queue. However imagine the scenario where two threads run these lines of code at the same time with a queue of count 1

    • Thread1: if (...) -> true
    • Thread2: if (...) -> true
    • Thread1: dequeue -> sucess
    • Thread2: dequeue -> fails because the queue is empty
  4. There is a race condition between Worker and Proceed that can lead to deadlock. The following two lines of code should be switched.

    Code:

        res_thr.Set()
        Interlocked.Decrement(ref Num_Of_Threads);

    The first line will unblock the Worker method. If it runs quickly enough it will go back through the look, notice that Num_Of_Threads < maxThreads and go right back into res_thr.WaitOne(). If no other threads are currently running then this will lead to a deadlock in your code. This is very easy to hit with a low number of maximum threads (say 1). Inverting these two lines of code should fix the issue.

  5. The maxThread count property does not seem to be useful beyond 4. The sem object is initialized to accept only 4 maximum concurrent entries. All code that actually executes an item must go through this semaphore. So you've effectively limited the maximum number of concurrent items to 4 regardless of how high maxThread is set.
JaredPar
+4  A: 

Writing robust threaded code is not trivial. There are numerous thread-pools around that you might look at for reference, but also note that Parallel Extensions (available as CTP, or later in .NET 4.0) includes a lot of additional threading constructs out-of-the-box (in the TPL/CCR). For example, Parallel.For / Parallel.ForEach, which deal with work-stealing, and handling the available cores effectively.

For an example of a pre-rolled thread-pool, see Jon Skeet's CustomThreadPool here.

Marc Gravell
Don't forget SmartThreadPool which is quite nice.
sixlettervariables
Thanks for usefull links and navigating ^_^
AlfeG
+2  A: 

I think you can simply things considerably.

Here is a modified form (I didn't test the modifications) of the thread pool I use:

The only sync. primitive you need is a Monitor, locked on the thread pool. You don't need a semaphore, or the reset events.

internal class ThreadPool
{
    private readonly Thread[] m_threads;
    private readonly Queue<Action> m_queue;
    private bool m_shutdown;
    private object m_lockObj;


    public ThreadPool(int numberOfThreads)
    {
        Util.Assume(numberOfThreads > 0, "Invalid thread count!");
        m_queue = new Queue<Action>();
        m_threads = new Thread[numberOfThreads];
        m_lockObj = new object();

        lock (m_lockObj)
        {
            for (int i = 0; i < numberOfWriteThreads; ++i)
            {
                m_threads[i] = new Thread(ThreadLoop);
                m_threads[i].Start();
            }
        }

    }

    public void Shutdown()
    {
        lock (m_lockObj)
        {
            m_shutdown = true;
            Monitor.PulseAll(m_lockObj);

            if (OnShuttingDown != null)
            {
                OnShuttingDown();
            }
        }
        foreach (var thread in m_threads)
        {
            thread.Join();
        }
    }
    public void Enqueue(Action a)
    {
        lock (m_lockObj)
        {
            m_queue.Enqueue(a);
            Monitor.Pulse(m_lockObj);
        }
    }

    private void ThreadLoop()
    {
        Monitor.Enter(m_lockObj);

        while (!m_shutdown)
        {
            if (m_queue.Count == 0)
            {
                Monitor.Wait(m_lockObj);
            }
            else
            {
                var a = m_queue.Dequeue();
                Monitor.Pulse(m_lockObj);
                Monitor.Exit(m_lockObj);
                try
                {
                    a();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("An unhandled exception occured!\n:{0}", ex.Message, null);
                }
                Monitor.Enter(m_lockObj);
            }
        }

        Monitor.Exit(m_lockObj);
    }
}
Scott Wisniewski
Never lock "this" :). Otherwise I can mess up your algorithmn by locking externally. Yes it's very bad practice to do so but how many times do you see people lock on an instance object?
JaredPar
I see your point. You can change the object being locked too an internal field, and that'll get fixed. For me, it's not a problem, because I control all the uses of the thread pool. For general code, or 700K LOC projects, I could see it being problem. I'll update the post.
Scott Wisniewski
Alrighty... I updated it to lock on m_lockObj instead....
Scott Wisniewski
ThreadLoop uses Monitor.Enter(this)
JaredPar
No it doesn't :)
Scott Wisniewski
(meaning I edited it)
Scott Wisniewski
There is one potential write ordering issue. Even though m_shutdown is set before Monitor.PulseAll() is run, there is no guarantee this write will be visible on all threads. So it's possible this could cause a deadlock during shutdown as the ThreadLoop would go straight back to waiting.
JaredPar
I'm not 100% sure on this point though. I don't know if there are memory barriers introduced by PulseAll().
JaredPar
yes. You are right. the correct behavior is to use Interlocked.Exchange there
Scott Wisniewski
Actually... I'm not sure if Interlocked.Exchange is needed.... If it was, than Monitor.Pule would be pretty useless
Scott Wisniewski
I'm trying to dig up the documentation that says where memory barriers are inserted and how this applies to this situation. Might take a week :(
JaredPar
I'll try and debug into Monitor.Pule and see what it does
Scott Wisniewski
All I can see with the public reference sources is that it calls the internal method ObjPulse. I don't think I can see the C++ sources.
Scott Wisniewski
I checked the ecma spec.Sections 12.6.5 says that entering and exiting a monitor should introduce a memory barrier.I think these means I might be ok...
Scott Wisniewski