views:

29

answers:

1

Imagine the following linq to observables statement:

var x = from result1 in service1.operation()
        from result2 in service2.operation()
        from result3 in service3.operation()
        select DoSomething()

x.Subscribe()

void Unit DoSomething() {
 ...
}

The services all return a cold observable, so they will wait for each one to complete and then DoSomething gets called.

Now is service2.operation returns an Observable.Empty which is basically an oncomplete notification and that will mean service3 will never get called and DoSomething neither.

I wanted it to continue the chain if an oncomplete gets returned , but provide a default value for result2. So I created a OnEmptyReturnDefault extension method

public static IObservable<T> OnEmptyReturnDefault<T>(this IObservable<T> observable)
{
    var maybeReturnsSomething = observable.Memoize(); // Custom Lazy caching extension method
    var whenEmpty = from isEmpty in maybeReturnsSomething.IsEmpty()
                    where isEmpty
                    select default(T);
    var whenNotEmpty = from isEmpty in maybeReturnsSomething.IsEmpty()
                        where !isEmpty
                        from notEmpty in maybeReturnsSomething
                        select notEmpty;
    return whenEmpty.Merge(whenNotEmpty);
}

Allowing me to do:

var x = from result1 in service1.operation()
            from result2 in service2.operation().OnEmptyReturnDefault()
            from result3 in service3.operation()
            select DoSomething()

All is good, except that my solution is blocking. IsEmpty() essentially does Take(1).Count() == 0. I want a solution that is not blocking.

+1  A: 

A solution suggested on another site works well:

public static IObservable<T> DefaultIfEmpty<T>(this IObservable<T> src, T defaultValue)
{
  return src
    .Materialize()
    .Select((n, i) => (n.Kind == NotificationKind.OnCompleted && i == 0)
                ? new Notification<T>[]
                {
                  new Notification<T>.OnNext(defaultValue), 
                  new Notification<T>.OnCompleted()
                }
                : new[] {n})
    .SelectMany(ns => ns)
    .Dematerialize();
}
Max