views:

66

answers:

3

I am having problem understanding how Observable.Delay works and when the Dispose() is meant to be called. Would anyone familiar with Rx be able to help please?

The following code snippet:

    static void Main(string[] args)
    {
        var oneNumberEveryFiveSeconds = new SomeObservable();
        // Instant echo
        oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));

        Console.ReadKey();
    }

    public class SomeObservable : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> o)
        {
            for (var i = 0; i < 2; i++)
            {
                o.OnNext(i);
            }
            o.OnCompleted();

            return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
        }
    }

    public class DisposableAction : IDisposable
    {
        public DisposableAction(Action dispose)
        {
            this.dispose = dispose;
        }

        readonly Action dispose;

        public void Dispose()
        {
            dispose();
        }
    }

produces the below result:

0
1
DISPOSED
DISPOSED
DISPOSED
...0...
...1...
......0......
......1......

I was expecting it to be more like:

0
1
DISPOSED
...0...
...1...
DISPOSED
......0......
......1......
DISPOSED

Any idea??

A: 

Without the ThreadPool the behaviour is identical:

0 1 DISPOSED DISPOSED ...0... ...1... ......0...... ......1......

Zubin Appoo
A: 

If I had to guess I'd say that Delay queues up the items coming from the original observable and then dispatches them as it sees fit based on the specified delay. Thus even though the original observable has long been disposed the observable created by Delay method is still alive and kicking. The behavior you are observing fits this explanation nicely.

PL
Hmm that kinda makes sense but why do the delayed observables get disposed before they even started? As I understand it, the IObserver created by the Subscribe(onNext) extension method only calls Dispose OnCompleted or OnException. Or am I still not fully understanding this?
Ted
It's not a delayed observable that got disposed, it's the original one. Try to add () => Console.WriteLine("completed") to Subscribe call. You'll see that completed is called as you expect it.
PL
ahh I see what you mean now.
Ted
I also posted this on another forum (the Rx forum) and got a really good explanation http://social.msdn.microsoft.com/Forums/en-US/rx/thread/a6897079-ccd3-4baf-aa55-d77545e8215bThanks
Ted
+1  A: 

The standard functionality of Rx is to dispose a subscription when the sequence completes (even if its values are still being piped through another sequence).

With that in mind, Delay cannot control the speed of values being emitted from the source sequence, it can only delay the values to its own observers.

Richard Szalay