views:

109

answers:

3

Hello everyone,

I am attempting at embedding multi-threading for the first time ever and running into some unexpected issues, hope you can help.

Here's the code fragment that gives me troubles:

ArrayList recordsCollection = new ArrayList();
ArrayList batchCollection = null;
int idx = 0;

while(true)
{
  // Some code to generate and assign new batchCollection here
  recordsCollection.Add(batchCollection);

  ThreadPool.QueueUserWorkItem(delegate
  {
    ProcessCollection(recordsCollection.GetRange(idx, 1));
  });
  Interlocked.Increment(ref idx);
}

private void ProcessCollection(ArrayList collection)
{
   // Do some work on collection here
}

Once the Process Collection method is invoked and I am attempting to iterate through collection I am getting "The range in the underlying list is invalid".

Thanks in advance!

Update: Guys, thank you to each and every one of you. Through applying your suggestions I was able to greatly simplify and get it to work.

+5  A: 

Your use of Interlocked.Increment here is unnecessary. You want that the local variable idx is only be seen by one thread so there is no need to lock.

Currently you are "closing over the loop variable" which means that the threads see the latest value of the variable rather than the value at the time the delegate was created. You want the other threads to receive copies of this variable. These copies will then not change even if the original variable changes.

Try changing your code to this:

int j = idx;
ThreadPool.QueueUserWorkItem(delegate
{
    ProcessCollection(recordsCollection.GetRange(j, 1));
});

Related question:

Related article:

Mark Byers
+1  A: 

You are playing with fire here. You got an open closure, see : http://en.wikipedia.org/wiki/Closure_(computer_science)

Also, why you use getRange if you are only getting one item ?

Using a generic list could also help.

    private void wee()
    {
        List<List<string>> recordsCollection = new List<List<string>>();

        //int idx = 0;

        while(true)
        {
            //scope the batchcollection here if you want to start a thread with an anonymous delegate
            List<string> batchCollection = null;
            // Some code to generate and assign new batchCollection here
            recordsCollection.Add(batchCollection);

              ThreadPool.QueueUserWorkItem(delegate
              {
                  ProcessCollection(batchCollection);
              });
              //Interlocked.Increment(ref idx);
        }
    }
    private void ProcessCollection(List<string> collection)
    {
        // Do some work on collection here
    }

Correct me if i'm wrong, but I do not think you will need the idx variable anymore.

Also, don't forget that exceptions are throw on by call stack : http://www.codeproject.com/KB/architecture/exceptionbestpractices.aspx

Cheers !

Alex Rouillard
+2  A: 

You have a couple of problems.

  • Like Mark pointed out you are capturing a loop variable which will really confuse things here.
  • You are modifying the collection while at the same time reading it without the use of synchronization mechanisms.

I am assuming you have omitted the code for acquiring a batchCollection and then periodically removing them from recordsCollection for brevity, otherwise there would be issues there as well.

Here is how you can fix it.

ArrayList recordsCollection = new ArrayList();  
ArrayList batchCollection = null;  
int idx = 0;  

while(true)  
{  
  lock (recordsCollection) 
  {
    recordsCollection.Add(batchCollection);  
  }

  int capturedIndex = idx; // Used for proper capturing.

  ThreadPool.QueueUserWorkItem(delegate  
  {
    ArrayList range;
    lock (recordsCollection)
    {
      range = recordsCollection.GetRange(capturedIndex, 1);
    }
    ProcessCollection(range);  
  });  

  idx++;
}  

Or my refactored version which, as best I can tell anyway, does the exact same thing...

List<List<Record>> recordsCollection = new ArrayList();  
List<Record> batchCollection = null;  

while(true)  
{  
  recordsCollection.Add(batchCollection);

  List<List<Record>> range = new List<List<Record>>();
  range.Add(batchCollection);

  ThreadPool.QueueUserWorkItem(delegate  
  {
    ProcessCollection(range);  
  });      
}  
Brian Gideon