views:

111

answers:

4

Hello,

I have a List with items that I want to download. I use a for Loop to iterate the list.

For each item in this List I start a new Thread that references the item. My Problem is that I want limit the maxDownload at the same time.

for (int i = downloadList.Count - 1; i >= 0; i--)
{
    downloadItem item = downloadList[i];
    if (item.Status != 1 && item.Status != 2)
    {
        ThreadStart starter = delegate { this.DownloadItem(ref item); };
        Thread t = new Thread(starter);
        t.IsBackground = true;
        t.Name = item.Name;
        t.Priority = ThreadPriority.Normal;
        t.Start();
    }
}

I read something about the ThreadPool, but then I can't reference my item. Can someone Help me? Thanks! :)

Edit:

I tested this:

ThreadPool.SetMaxThreads(maxDownloads, maxDownloads);
ThreadPool.SetMinThreads(maxDownloads, maxDownloads);
ThreadPool.QueueUserWorkItem(DownloadItem, ref item);

I don't know how I can reference my downloadItem with this thread.....

A: 

I can't see why you are trying to use the ref keyword anyway. Objects are passed by reference in C# by default, and in your original code you are not using item after it is passed to DownloadItem. Therefore I would suggest using the ThreadPool methods you tried, but not using a ref parameter.

Hope that helps.

Kieren Johnstone
+2  A: 

if you're using .NET 4, I'd strongly suggest using Parallel.ForEach (potentially on downloadList.Reverse())

so, something like:

Parallel.ForEach(downloadList.Reverse(), 
                 new ParallelOptions { MaxDegreeOfParallelism = 8 },
                 item => this.DownloadItem(item));

If you don't want the calling thread to block, you could QueueUserWorkItem this call, of course.

James Manning
If you're on .NET4 and don't want to use Parallel.ForEach, then the new Task stuff is also worth a look at, as it solves a lot of the problems with using the ThreadPool directly.
Will Dean
+1  A: 

I solved this very problem in .Net 3.5 by creating threads and loading them into a queue. Then I read a thread from the queue, start it, and increment the running thread count. I keep doing this until I hit the upper limit.

As each thread finishes it invokes a callback method that decrements the running count and signals the queue reader to start more threads. For additional control you can use a dictionary to keep track of running threads, keyed by ManagedThreadId, so you can signal threads to stop early or report progress.

Sample console app:

using System;
using System.Collections.Generic;
using System.Threading;

namespace ThreadTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supervisor = new Supervisor();
            supervisor.LaunchThreads();
            Console.ReadLine();
            supervisor.KillActiveThreads();
            Console.ReadLine();
        }

        public delegate void WorkerCallbackDelegate(int threadIdArg);
        public static object locker = new object();

        class Supervisor
        {
            Queue<Thread> pendingThreads = new Queue<Thread>();
            Dictionary<int, Worker> activeWorkers = new Dictionary<int, Worker>();

            public void LaunchThreads()
            {
                for (int i = 0; i < 20; i++)
                {
                    Worker worker = new Worker();
                    worker.DoneCallBack = new WorkerCallbackDelegate(WorkerCallback);
                    Thread thread = new Thread(worker.DoWork);
                    thread.IsBackground = true;
                    thread.Start();
                    lock (locker)
                    {
                        activeWorkers.Add(thread.ManagedThreadId, worker);
                    }
                }
            }

            public void KillActiveThreads()
            {
                lock (locker)
                {
                    foreach (Worker worker in activeWorkers.Values)
                    {
                        worker.StopWork();
                    }
                }
            }

            public void WorkerCallback(int threadIdArg)
            {
                lock (locker)
                {
                    activeWorkers.Remove(threadIdArg);
                    if (activeWorkers.Count == 0)
                    {
                        Console.WriteLine("no more active threads");
                    }
                }
            }
        }

        class Worker
        {
            public WorkerCallbackDelegate DoneCallBack { get; set; }
            volatile bool quitEarly;

            public void DoWork()
            {
                quitEarly = false;
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " started");
                DateTime startTime = DateTime.Now;
                while (!quitEarly && ((DateTime.Now - startTime).TotalSeconds < new Random().Next(1, 10)))
                {
                    Thread.Sleep(1000);
                }
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " stopped");
                DoneCallBack(Thread.CurrentThread.ManagedThreadId);
            }

            public void StopWork()
            {
                quitEarly = true;
            }
        }
    }
}
ebpower
Can you give me a bit from this code?Thanks..
Werewolve
Added sample code to answer.
ebpower
A: 

The best way to handle this is to only create maxDownloads number of threads. Put all of your work items into a queue and let the threads compete with each other to figure out which one processes each work item.

var queue = new ConcurrentQueue<downloadItem>(downloadList);
for (int i = 0; i < Math.Min(maxDownloads, queue.Count))
{
  var thread = new Thread(
    () =>
    {
      while (true)
      {
        downloadItem item = null;
        if (queue.TryDequeue(out item))
        {
          // Process the next work item.
          DownloadItem(item);
        }
        else
        {
          // No more work items are left.
          break;
        }
      }
    });
    thread.IsBackground = true;
    thread.Start();
}

You could also use a semaphore to throttle the number of threads processing work items. This is especially useful when the actual number of threads is unknown as would be the case if you were using the ThreadPool.

var semaphore = new Semaphore(maxDownloads, maxDownloads);
for (int i = 0; i < downloadList.Count; i++)
{
  downloadItem item = downloadList[i];
  ThreadPool.QueueUserWorkItem(
    (state) =>
    {
      semaphore.WaitOne();
      try
      {
        DownloadItem(item);
      }
      finally
      {
        semaphore.Release();
      }
    });
}

I am not particularly fond of either approach. The problem with the first is that a nonfixed amount of threads are created. It is generally advised to avoid creating threads in a for loop as that tends to not scale well. The problem with the second is that the semaphore will block some of the ThreadPool threads. That is not advised either because you are effecitvely claiming one of threads and then doing nothing on it. That might effect the performance of other unrelated tasks that happen to be sharing the ThreadPool. I think in this case either of the two options will be fine since crafting a more scalable pattern is more work than it is worth.

Brian Gideon