views:

224

answers:

2

Hi folks,

I have an observable collection that I want to process in parallel, then observe the processed values while filtering and finally subscribe a handler that receives the filtered values.

My sample is syntactically correct and compiles just fine, and when I run the code, the Where statement doing the filtering is evaluated. But no data comes through to the subscription. If I remove AsParallel so that the processing is done over a regular IEnumerable, data comes through and everything works as expected.

Here is my sample, doing some processing on strings:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

The next weird thing is that if I use the TakeWhile operator, which in my mind is conceptually similar to Where, observing the ParallelQuery works as expected:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Adding some logging code to the subscription shows that data is received up til the ToObservable conversion, but not after:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

A breakpoint in the lambda at line 4 is hit while a breakpoint in the lambda at line 6 is never hit.

Why will TakeWhile make data come through to the subscriber while Where does not?

If it is of importance, I develop my code in Visual Studio 2010 RC with a project targeting .Net 4.0 Framework Client Profile.

Update: based on @Sergeys answer I reworked the placement of the Where filter. The following code works as expected:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

It still feels a bit awkward to have to first convert the initial observable processedStrings into an enumerable in order to parallelize it, and then convert it back to an observable in order to subscribe to the final result.

+2  A: 

TakeWhile isn't conceptually equivalent to Where, because it depends on ordering. I suspect that the query is actually executing sequentially (see this blog post). Try calling .WithExecutionMode(ParallelExecutionMode.ForceParallelism) in your TakeWhile example, and I suspect you'll see the same result.

I don't know why it's not working in the parallel case though... can I suggest that you put in some logging to see how far the data reaches? You can perform useful logging with a Select which returns the original item after logging it, for example.

Jon Skeet
Hm, well, if it was executing sequentially perhaps it had worked. It is the introduction of AsParallel() that halts processing. Adding WithExecutionMode does not change the behavior: data is still not coming through when filtering with Where. I will try to add some more logging, yet a breakpoint in the Where lambda shows that data at least reaches the Where operator...
Peter Lillevold
Hm, why the downvote?
Peter Lillevold
@Peter: The downvote was one of a series of downvotes at roughly the same time, so probably just someone with a grudge. The idea of WithExecutionMode(...) was to show it failing with TakeWhile as well, not to make Where work. You might want to put a logging operation just before ToObservable() to see what the observable is actually receiving. Additionally, it's probably worth investigating whether there's some way of parallelising the observable itself - it feels odd to flit between the two models like this.
Jon Skeet
Added logging which only confirms the fact: data is received up until the final conversion to observable.
Peter Lillevold
+2  A: 

From the C# 4.0 in a Nutshell:


There are currently some practical limitations on what PLINQ can parallelize. These limitations may loosen with subsequent service packs and Framework versions. The following query operators prevent a query from being parallelized, unless the source elements are in their original indexing position:

  • Take, TakeWhile, Skip, and SkipWhile
  • The indexed versions of Select, SelectMany, and ElementAt

Most query operators change the indexing position of elements (including those that remove elements, such as Where). This means that if you want to use the preceding operators, they’ll usually need to be at the start of the query


So, in fact, using TakeWhile prevents the .AsParallel() from parallelizing. It is hard to say why Where kills the subscriptiion, but putting it before AsParallel might fix the problem.

Sergey Aldoukhov
Do you mean that .TakeWhile used *before* .AsParallel will prevent parallelizing? I'm a bit confused as to how this explains what I'm seeing. In my sample, as far as I can tell, neither TakeWhile nor Where breaks the parallelizing part (data always reaches the Where lambda). On the other hand, .Where somehow prevents data from being pushed further by the observable.
Peter Lillevold
No, it is opposite, TakeWhile used beforeAsParallel would not prevent parallelizing. The culprit is that AsObservable by some reason (and I do not know why) does not work with parallel data coming into the pipeline.
Sergey Aldoukhov