



In an IObservable sequence (in Reactive Extensions for .NET), I'd like to get the value of the previous and current elements so that I can compare them. I found an example online similar to below which accomplishes the task:

sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })

It works fine except that it evaluates the sequence twice, which I would like to avoid. You can see that it is being evaluated twice with this code:

var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();

The output shows twice as many of the debug lines as there are elements in the sequence.

I understand why this happens, but so far I haven't found an alternative that doesn't evaluate the sequence twice. How can I combine the previous and current with only one sequence evaluation?


If you only need to access the previous element during subscription, this is probably the simplest thing that will work. (I'm sure there's a better way, maybe a buffer operator on IObservable? The documentation is pretty sparse at the moment, so I can't really tell you.)

    EventArgs prev = null;

    sequence.Subscribe(curr => 
        if (prev != null)
            // Previous and current element available here

        prev = curr;                              


EventArgs is just a stand-in for the type of your event's argument.

Gurdas Nijor
Thanks for the suggestion. I do however need the previous element before subscription because I would like to filter the sequence based on logic using the previous and current.
Got it; i'll do a bit more research
Gurdas Nijor
+2  A: 

It turns out you can use a variable to hold the previous value and refer to it and reassign it within the chain of IObservable extensions. This even works within a helper method. With the code below I can now call CombineWithPrevious() on my IObservable to get a reference to the previous value, without re-evaluating the sequence.

public class ItemWithPrevious<T>
    public T Previous;
    public T Current;

public static class MyExtensions
    public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
        var previous = default(T);

        return source
            .Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
            .Do(items => previous = items.Current);
Interesting, not immediately obvious that this works!
+2  A: 

Evaluating twice is an indicator of a Cold observable. You can turn it to a Hot one by using .Publish():

var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
Sergey Aldoukhov
Found an easy to digest article about the difference between Hot and Cold Observables:
Gurdas Nijor
Using Publish and Connect I still see the side effects of the Zip(Skip(1), ...) occur twice. I suppose it's not really the original source I needed to evaluate once, but the IObservable query itself. Still, thanks for the info.
@dcstraw Strange, I published the debugSequence from your example above (I used Observable.Range for the sequence) and it evaluated once for me. Using .Publish in the described situation is pretty much a no-brainer in the RX land...
Sergey Aldoukhov

See this link for an example of how to consume a sequence as pairs (prev, curr) of items:

Some good info there but it's specific to IEnumerable rather than IObservable.