views:

132

answers:

2

I am using the parallel data structures in my .NET 4 application and I have a ConcurrentQueue that gets added to while I am processing through it.

I want to do something like:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

as I make database calls to save the data, so I am limiting the number of concurrent threads.

But, I expect that the ForAll isn't going to dequeue, and I am concerned about just doing

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

as there is no guarantee that I am popping off the correct one.

So, how can I iterate through the collection and dequeue, in a parallel fashion.

Or, would it be better to use PLINQ to do this processing, in parallel?

+1  A: 

Hi

Well I'm not 100% sure what you try to archive here. Are you trying to just dequeue all items until nothing is left? Or just dequeue lots of items in one go?

The first probably unexpected behavior starts with this statement:

 theQueue.AsParallel()

For a ConcurrentQueue, you get a 'Snapshot'-Enumerator. So when you iterate over a concurrent stack, you only iterate over the snapshot, no the 'live' queue.

In general I think it's not a good idea to iterate over something you're changing during the iteration.

So another solution would look like this:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

This avoids manipulation something while iterating over it. However, after that statement, the queue can still contain data, which was added during the for-loop.

To get the queue empty for moment in time you probably need a little more work. Here's an really ugly solution. While the queue has still items, create new tasks. Each task start do dequeue from the queue as long as it can. At the end, we wait for all tasks to end. To limit the parallelism, we never create more than 20-tasks.

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);
Gamlor
I will try the middle idea, as that may work. This is part of a web service that may get 10k updates in a very short times, 1 at a time, but I don't want to hammer the database by trying to do each update as I get them, so I will shove them into a static queue, in a singleton, and have a function do the processing. My solution isn't perfect, but I need to protect the database as a high priority.
James Black
A: 

If you are aiming at a high throughout real site and you don't have to do immediate DB updates , you'll be much better of going for very conservative solution rather than extra layers libraries.

Make fixed size array (guestimate size - say 1000 items or N seconds worth of requests) and interlocked index so that requests just put data into slots and return. When one block gets filled (keep checking the count), make another one and spawn async delegate to process and send to SQL the block that just got filled. Depending on the structure of your data that delegate can pack all data into comma-separated arrays, maybe even a simple XML (got to test perf of that one of course) and send them to SQL sproc which should give it's best to process them record by record - never holding a big lock. It if gets heavy, you can split your block into several smaller blocks. The key thing is that you minimized the number of requests to SQL, always kept one degree of separation and didn't even have to pay the price for a thread pool - you probably won't need to use more that 2 async threads at all.

That's going to be a lot faster that fiddling with Parallel-s.

ZXX
My hope was to just have the queue be emptied as it is filled, so if I get hundreds of items in one web service call, then they send another large batch, it will just keep going through while there is something inside there. At the moment I sleep 10 seconds, process the queue, then sleep another 10 seconds.
James Black