views:

362

answers:

6

I'm working on a multi-threaded scraper for a website and as per a different question I've decided to use the ThreadPool with QueueUserWorkItem().

How can I continually Queue work items without queuing them all at once? I need to queue > 300k items (one for each userID) and if I loop to queue them all I'll run out of memory.

So, what I would like is:

// 1 = startUserID, 300000 = endUserID, 25 = MaxThreads  
Scraper webScraper = new Scraper(1, 300000, 25); 

webScraper.Start();  
// return immediately while webScraper runs in the background

During this time, webScraper is continuouslly adding all 300000 workItems as threads become available.

Here is what I have so far:

public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private static int CurrentUserID { get; set; }
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();


        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int CurrentUserID, int MaxUserID, int MaxThreads)
        {
            this.CurrentUserID = CurrentUserID;
            this.MaxUserID = MaxUserID;
            this.MaxThreads = MaxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(MaxThreads, MaxThreads);
        }

        public void Start()
        {
            int availableThreads;

            // Need to start a new thread to spawn the new WorkItems so Start() will return right away?
            while (Running)
            {

                // if (!CurrentUserID >= MaxUserID)
                // {
                //     while (availableThreads > 0)
                //     {
                //         ThreadPool.QueueUserWorkItem(new WaitCallBack(Process));
                //     }
                // }
                // else
                // { Running = false; }
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public static void process(object state)
        {
             var userID = Interlocked.Increment(ref CurrentUserID);
             ... Fetch Stats for userID
        }
    }

Is this the right approach?

Can anyone point me in the right direction for handling the creation of my work items while in the background once Start() is called, and not creating all Work items at once?

+1  A: 

Would this be better implemented with less Work Items that steal work from a queue of work? Just because you have 300,000 pieces of work to do it doesn't mean you need 300,000 workers to do it. Obviously as you only have a few cores, only a few of these pieces of work can be happening in parallel, so why not hand out chunks of work to much fewer workers?

Depending on how constant the time taken for each piece of work is, you can either split it all evenly across each worker or have a central queue (that you'll have to lock around) and each worker can grab some work as it runs out.

EDIT:

Joe Duffy seems to have a series about writing a Work Stealing Queue here: http://www.bluebytesoftware.com/blog/2008/08/12/BuildingACustomThreadPoolSeriesPart2AWorkStealingQueue.aspx. It also looks like .Net 4's Threadpool is going to be a bit smarter. But I don't think you need something particularly complex for this scenario.

Niall Connaughton
A: 

I think creating a queue of queued items doesn't seem quite right somehow, so how about making the WorkItems queue themselves again after they've finished?

Your Start method could queue up, say, 3 times MaxThreads items (75 in your example) and then your Process method queues itself when it's finished. That way your Start method returns quickly but fires off a number of work items, which as I say then fire themselves:


    public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private int currentUserID;
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();

        private int Multiplier { get; set; }

        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int currentUserID, int maxUserID, int maxThreads)
        {
            this.currentUserID = currentUserID;
            this.MaxUserID = maxUserID;
            this.MaxThreads = maxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(maxThreads, maxThreads);
            Multiplier = 3;
        }

        public void Start()
        {
            Running = true;
            for (int i = 0; i < MaxThreads * Multiplier; i++)
            {
                ThreadPool.QueueUserWorkItem(Process);
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public void Process(object state)
        {
            if (Running == false)
            {
                return;
            }
            if (currentUserID < MaxUserID)
            {
                Interlocked.Increment(ref currentUserID);
                //Parse stats for currentUserID
                ThreadPool.QueueUserWorkItem(Process);
            }
            else
            { Running = false; }
        }
    }

I'm sure the Running flag should be being set using Interlocked for safety. I've made the multiplier into a property, which could be passed to the constructor - I'm fairly sure it could be adjusted to tweak performance, depending on how long those stats take to parse.

Daniel Ives
A: 

I definitely won't use ThreadPool.SetMaxThreads - remember that the threadpool is shared between all processes - setting the maximum amount of threads would simply kill performance. The whole idea behind the threadpool is that you don't need to specify things like the maximum amount of threads - the .Net framework figures out the optimum amount of threads to allocate - you don't need to do it.

Note that queuing 300 000 items would not cause 300 000 threads to spawn - the ThreadPool class will manage the number of threads for you and re-use threads as necessary. If you are simply worried that too many resources will be consumed this way I would recommend that you refine your process - perhaps create a 'Spawner' class which in turn runs 1000 of the scraper instances?

Jaco Pretorius
Your first paragraph is incorrect. Per MSDN (http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx), there is one thread pool per process.
Matt Davis
Thanks for the update - it seems I was misinformed
Jaco Pretorius
A: 

It looks like you need a Master process control class that governs the amount of workers that are firing off and keeps the Queue full.

You could work with two queues then:

  1. One to hold all the items you need to scrape
  2. Second to do the work

This Master/Governor object would then keep a loop until all your items from Queue #1 are gone and it would keep adding to Queue #2 when you have available cycles.

Brett Veenstra
A: 

I'm curious. What is it you want to obtain here? You want to do something with each userID? Why not run your userID loop in one single thread - e.g. using a BackgroundWorker or QueueUserWorkItem for that matter?

Torben Rahbek Koch
A: 

You can use a different thread pool. Here is one: http://www.codeplex.com/smartthreadpool It allows you to queue up all your items at once. You can assign a max number of threads to create. Say you have 1000 work items and you assign 100 threads. It will immediately take the first 100 items and get them going while the rest wait. As soon as one of those items is done and a thread frees up, the next queued item is started. It manages all the work but won't saturate threads and memory. Also, it doesn't use threads from the .net thread pool.

Matthew