Hi folks,
I have an observable collection that I want to process in parallel, then observe the processed values while filtering and finally subscribe a handler that receives the filtered values.
My sample is syntactically correct and compiles just fine, and when I run the code, the Where
statement doing the filtering is evaluated. But no data comes through to the subscription. If I remove AsParallel
so that the processing is done over a regular IEnumerable
, data comes through and everything works as expected.
Here is my sample, doing some processing on strings:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
The next weird thing is that if I use the TakeWhile
operator, which in my mind is conceptually similar to Where, observing the ParallelQuery works as expected:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
Adding some logging code to the subscription shows that data is received up til the ToObservable
conversion, but not after:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
A breakpoint in the lambda at line 4 is hit while a breakpoint in the lambda at line 6 is never hit.
Why will TakeWhile
make data come through to the subscriber while Where
does not?
If it is of importance, I develop my code in Visual Studio 2010 RC with a project targeting .Net 4.0 Framework Client Profile.
Update: based on @Sergeys answer I reworked the placement of the Where
filter. The following code works as expected:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
It still feels a bit awkward to have to first convert the initial observable processedStrings
into an enumerable in order to parallelize it, and then convert it back to an observable in order to subscribe to the final result.