I've created a custom thread pool utility, but there seems to be a problem that I cannot find.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
private readonly Queue queue = Queue.Synchronized(new Queue());
private readonly AutoResetEvent res = new AutoResetEvent(true);
private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
private readonly Semaphore sem = new Semaphore(1, 4);
private readonly Thread thread;
private Action<T> DoWork;
private int Num_Of_Threads;
private QueueManager()
{
Num_Of_Threads = 0;
maxThread = 5;
thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
thread.Start();
// log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
}
public int maxThread { get; set; }
public static FileUploadQueueManager<T> Instance
{
get { return Nested.instance; }
}
/// <summary>
/// Executes multythreaded operation under items
/// </summary>
/// <param name="list">List of items to proceed</param>
/// <param name="action">Action under item</param>
/// <param name="MaxThreads">Maximum threads</param>
public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
maxThread = MaxThreads;
DoWork = action;
foreach (T item in list)
{
Add(item);
}
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
{
ExecuteNoThread(list, action, 0);
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
foreach (T wallpaper in list)
{
action(wallpaper);
}
}
/// <summary>
/// Default 10 threads
/// </summary>
/// <param name="list"></param>
/// <param name="action"></param>
public void Execute(IEnumerable<T> list, Action<T> action)
{
Execute(list, action, 10);
}
private void Add(T item)
{
lock (queue)
{
queue.Enqueue(item);
}
res.Set();
}
private void Worker()
{
while (true)
{
if (queue.Count == 0)
{
res.WaitOne();
}
if (Num_Of_Threads < maxThread)
{
var t = new Thread(Proceed);
t.Start();
}
else
{
res_thr.WaitOne();
}
}
}
private void Proceed()
{
Interlocked.Increment(ref Num_Of_Threads);
if (queue.Count > 0)
{
var item = (T) queue.Dequeue();
sem.WaitOne();
ProceedItem(item);
sem.Release();
}
res_thr.Set();
Interlocked.Decrement(ref Num_Of_Threads);
}
private void ProceedItem(T activity)
{
if (DoWork != null)
DoWork(activity);
lock (Instance)
{
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
}
}
#region Nested type: Nested
protected class Nested
{
// Explicit static constructor to tell C# compiler
// not to mark type as beforefieldinit
internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
}
#endregion
}
}
Problem is here:
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
There is always ONE thread id in title. And program seems to be working in one thread.
Sample usage:
var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
QueueManager<int>.Instance.Execute(i_list,
i =>
{
Console.WriteLine("Some action under element number {0}", i);
}, 5);
P.S.: it's pretty messy, but I'm still working on it.