views:

55

answers:

1

Currently I'm in the process of designing the messaging system for my application (which uses AMQP on the backend via RabbitMQ). There are going to be multiple instances where a method can get data from multiple sources at the same time (ie. doesn't have to be sequential queries).

Originally, I was going to use the ThreadPool and QueueUserWorkItem for each different request in the method, and then join them up somehow. This may be problematic, because several different components of the application can do this at once, and each component could have a large number of parallel requests which would starve the ThreadPool.

Is there a more efficient/effective way of doing this?

+1  A: 

It's ok to stress the threadpool. You can throw a bunch of workitems on it - hundreds, thousands - and just let 'er rip. Not sure what you mean by "starve". Unless there is a set of work items that ought to be prioritized differently, you probably don't need to worry about starvation.

If you use QUWI, it's up to you to figure out how to merge the parallelized results back into one single result.


Sounds to me like you are doing a map/reduce approach. Here's a quick map function that uses QUWI, and an example of how to use it.

public static IEnumerable<T2> Map_QUWI<T, T2>(List<T> inputs, Func<T, T2> fn)
{
    int c = inputs.Count;
    if (c == 0) return null;
    T2[] result = new T2[c];
    if (c == 1)
    {
        // only one input - perform the work on main thread
        result[0] = fn(inputs[0]);
        return result;
    }

    using (ManualResetEvent done = new ManualResetEvent(false))
    {
        int countdown = inputs.Count;
        WaitCallback cb = delegate (Object obj)
            {
                int ix = (int)obj;
                result[ix] = fn(inputs[ix]);
                if (Interlocked.Decrement(ref countdown) == 0)
                    done.Set(); // signal all done
            };

        // queue up all workitems
        for (int i = 0; i < c; i++)
            ThreadPool.QueueUserWorkItem(cb,i);

        // Wait for done.Set(), which happens in the WaitCallback
        // when the last workitem is completed.
        done.WaitOne();
    }

    return result;
}

Example of use:

// returns the number of prime numbers, less than or equal to x
private int NumberOfPrimesLessThanOrEqualTo(int x)
{
    int count= 0;
    int n = x;
    if (n>=2) count++;
    if (x%2==0) n--;
    if (n>0)
    {
        do
        {
            if (IsPrime(n)) count++;
            n-=2;
        } while (n>0);
    }
    return count;
}


private void Demo()
{
    var list = new List<int>(new int[] {2,4,8,16,32,64,128,256,512,1024,2048,
                                        4096,8192,16384,32768,65536,131072});
    Func<int,int> fn = NumberOfPrimesLessThanOrEqualTo;
    var result= Map_QUWI(list, fn);
    (new List<int>(result)).ForEach(System.Console.WriteLine);
}
Cheeso