views:

1237

answers:

1

Ok, playing around with the .Net 4.0 Parellel Extensions in System.Threading.Tasks. I'm finding what seems like weird behaivor, but I assume I'm jsut doing something wrong. I have an interface and a couple implementing clases, they're simple for this.

interface IParallelPipe
{
    void Process(ref BlockingCollection<Stream> stream, long stageId);
}

class A:IParallelPipe
{
    public void Process(ref BlockingCollection<Stream> stream, long stageId)
    {
        //do stuff
    }
}

class B:IParallelPipe
{
    public void Process(ref BlockingCollection<Stream> stream, long stageId)
    {
        //do stuff
    }
}

I then have my class that starts things off on these. This is where the problem arises. I essentially get information about what implementing class to invoke from a type passed in and then call a factory to instantiate it and then I create a task with it and start it up. Shown here:

BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();                   
foreach (Stage s in pipeline.Stages) 
{
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    Task.Factory.StartNew(() => p.Process(ref bcs, s.id)); 
}

In each run of this in my sample, pipeline.Stages contains two elements, one that gets instantiated as class A and the other as class B. This is fine, I see it in te debugger as p coming away with the two different types. However, class B never gets called, instead I get two invocations of the A.Process(...) method. Both contain the stageId for the that was passed in (ie. the two invocations have different stageIds).

Now, if I take and separate things out a bit, just for testing I can get things to work by doing something like this:

BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();                   
A a = null;
B b = null;
foreach (Stage s in pipeline.Stages) 
{
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    if(p is A)
        a = p;
    else
        b = p;
}
Task.Factory.StartNew(() => a.Process(ref bcs, idThatINeed)); 
Task.Factory.StartNew(() => b.Process(ref bcs, idThatINeed));

This invokes the appropriate class!

Any thoughts???

+3  A: 

The behaviour you're describing seems odd to me - I'd expect the right instances to be used, but potentially with the wrong stage ID - the old foreach variable capture problem. The variable s is being captured, and by the time the task factory evaluates the closure, the value of s has changed.

This is definitely a problem in your code, but it doesn't explain why you're seeing a problem. Just to check, you really are declaring p within the loop, and not outside it? If you were declaring p outside the loop, that would explain everything.

Here's the fix for the capture problem though:

BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
foreach (Stage s in pipeline.Stages) 
{
    Stage copy = s;
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    Task.Factory.StartNew(() => p.Process(ref bcs, copy.id)); 
}

Note that we're just taking a copy inside the loop, and capturing that copy, to get a different "instance" of the variable each time.

Alternatively, instead of capturing the stage, we could just capture the ID as that's all we need:

BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
foreach (Stage s in pipeline.Stages) 
{
    long id = s.id;
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    Task.Factory.StartNew(() => p.Process(ref bcs, id)); 
}

If that doesn't help, could you post a short but complete program which demonstrates the problem? That would make it a lot easier to track down.

Jon Skeet
I though the same thing, "this has nothing to do with p", which is why I didn't think of the foreach issue. However, I added the "Stage copy" and the problem is cleared up. Interesting. Will do more investigation...
MikeD
and yes, I am declaring p within the loop.
MikeD