views:

2598

answers:

7

Greetings.

I'm trying to implement some multithreaded code in an application. The purpose of this code is to validate items that the database gives it. Validation can take quite a while (a few hundred ms to a few seconds), so this process needs to be forked off into its own thread for each item.

The database may give it 20 or 30 items a second in the beginning, but that begins to decline rapidly, eventually reaching about 65K items over 24 hours, at which point the application exits.

I'd like it if anyone more knowledgeable could take a peek at my code and see if there's any obvious problems. No one I work with knows multithreading, so I'm really just on my own, on this one.

Here's the code. It's kinda long but should be pretty clear. Let me know if you have any feedback or advice. Thanks!

public class ItemValidationService
{
    /// <summary>
    /// The object to lock on in this class, for multithreading purposes.
    /// </summary>
    private static object locker = new object();

    /// <summary>Items that have been validated.</summary>
    private HashSet<int> validatedItems;

    /// <summary>Items that are currently being validated.</summary>
    private HashSet<int> validatingItems;

    /// <summary>Remove an item from the index if its links are bad.</summary>
    /// <param name="id">The ID of the item.</param>
    public void ValidateItem(int id)
    {
        lock (locker)
        {
            if
            (
                !this.validatedItems.Contains(id) &&
                !this.validatingItems.Contains(id)
            ){
                ThreadPool.QueueUserWorkItem(sender =>
                {
                    this.Validate(id);
                });
            }
        }

    } // method

    private void Validate(int itemId)
    {
        lock (locker)
        {
            this.validatingItems.Add(itemId);
        }

        // *********************************************
        // Time-consuming routine to validate an item...
        // *********************************************

        lock (locker)
        {
            this.validatingItems.Remove(itemId);
            this.validatedItems.Add(itemId);
        }

    } // method

} // class
+4  A: 

The thread pool is a convenient choice if you have light weight sporadic processing that isn't time sensitive. However, I recall reading on MSDN that it's not appropriate for large scale processing of this nature.

I used it for something quite similar to this and regret it. I took a worker-thread approach in subsequent apps and am much happier with the level of control I have.

My favorite pattern in the worker-thread model is to create a master thread which holds a queue of tasks items. Then fork a bunch of workers that pop items off that queue to process. I use a blocking queue so that when there are no items the process, the workers just block until something is pushed onto the queue. In this model, the master thread produces work items from some source (db, etc.) and the worker threads consume them.

Michael Haren
A: 

I would be concerned about performance here. You indicated that the database may give it 20-30 items per second and an item could take up to a few seconds to be validated. That could be quite a large number of threads -- using your metrics, worst case 60-90 threads! I think you need to reconsider the design here. Michael mentioned a nice pattern. The use of the queue really helps keep things under control and organized. A semaphore could also be employed to control number of threads created -- i.e. you could have a maximum number of threads allowed, but under smaller loads, you wouldn't necessarily have to create the maximum number if fewer ended up getting the job done -- i.e. your own pool size could be dynamic with a cap.

When using the thread-pool, I also find it more difficult to monitor the execution of threads from the pool in their performing the work. So, unless it's fire and forget, I am in favor of more controlled execution. I know you mentioned that your app exits after the 65K items are all completed. How are you monitoring you threads to determine if they have completed their work -- i.e. all queued workers are done. Are you monitoring the status of all items in the HashSets? I think by queuing your items up and having your own worker threads consume off that queue, you can gain more control. Albeit, this can come at the cost of more overhead in terms of signaling between threads to indicate when all items have been queued allowing them to exit.

Peter Meyer
I'm adding each item to the ThreadPool to let <i>it</i> many how many threads to actually run at once. I'm not just creating a a new thread for each item and starting it.
Chris
Good point. I would just be concerned about maxing out the threadpool since it's used elsewhere by the CLR; however, if this all the app is doing ...
Peter Meyer
The only other thing I would bring up is, do you know if QueueUserWorkItem can handle queuing that large a number of items? Does it have limits? What happens if QueueUserWorkItem returns false?
Peter Meyer
Peter: I have the same questions.
Chris
+1  A: 

Be careful, QueueUserWorkItem might fail

Mauricio Scheffer
+2  A: 

I second the idea of using a blocking queue and worker threads. Here is a blocking queue implementation that I've used in the past with good results: http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

What's involved in your validation logic? If its mainly CPU bound then I would create no more than 1 worker thread per processor/core on the box. This will tell you the number of processors: Environment.ProcessorCount

If your validation involves I/O such as File Access or database access then you could use a few more threads than the number of processors.

Ted Elliott
Good reference -- I've used a the same implementation in a classic single producer/multiple consumer scenario. I modified it one time as well, turning it into a byte queue (less overhead with boxing/unboxing value types) to do large file processing -- worked great. Very solid implementation.
Peter Meyer
+1  A: 

There is a possible logic error in the code posted with the question, depending on where the item id in ValidateItem(int id) comes from. Why? Because although you correctly lock your validatingItems and validatedItems queues before queing a work item, you do not add the item to the validatingItems queue until the new thread spins up. That means there could be a time gap where another thread calls ValidateItem(id) with the same id (unless this is running on a single main thread).

I would add item to the validatingItems queue just before queuing the item, inside the lock.

Edit: also QueueUserWorkItem() returns a bool so you should use the return value to make sure the item was queued and THEN add it to the validatingItems queue.

Mitch Wheat
correction to your edit, that reintroduces the bug. you need to add it, then queue, then remove it if queueing failed.
TheSoftwareJedi
+1  A: 

ThreadPool may not be optimal for jamming so much at once into it. You may want to research the upper limits of its capabilities and/or roll your own.

Also, there is a race condition that exists in your code, if you expect no duplicate validations. The call to

this.validatingItems.Add(itemId);

needs to happen in the main thread (ValidateItem), not in the thread pool thread (Validate method). This call should occur a line before the queueing of the work item to the pool.

A worse bug is found by not checking the return of QueueUserWorkItem. Queueing can fail, and why it doesn't throw an exception is a mystery to us all. If it returns false, you need to remove the item that was added to the validatingItems list, and handle the error (throw exeception probably).

TheSoftwareJedi
A: 

You could also try using the CCR - Concurrency and Coordination Runtime. It's buried inside Microsoft Robotics Studio, but provides an excellent API for doing this sort of thing.

You'd just need to create a "Port" (essentially a queue), hook up a receiver (method that gets called when something is posted to it), and then post work items to it. The CCR handles the queue and the worker thread to run it on.

Here's a video on Channel9 about the CCR.

It's very high-performance and is even being used for non-Robotics stuff (Myspace.com uses it behind the scenese for their content-delivery network).