views:

123

answers:

4

In an IObservable sequence (in Reactive Extensions for .NET), I'd like to get the value of the previous and current elements so that I can compare them. I found an example online similar to below which accomplishes the task:

sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })

It works fine except that it evaluates the sequence twice, which I would like to avoid. You can see that it is being evaluated twice with this code:

var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();

The output shows twice as many of the debug lines as there are elements in the sequence.

I understand why this happens, but so far I haven't found an alternative that doesn't evaluate the sequence twice. How can I combine the previous and current with only one sequence evaluation?

A: 

If you only need to access the previous element during subscription, this is probably the simplest thing that will work. (I'm sure there's a better way, maybe a buffer operator on IObservable? The documentation is pretty sparse at the moment, so I can't really tell you.)

    EventArgs prev = null;

    sequence.Subscribe(curr => 
    {
        if (prev != null)
        {
            // Previous and current element available here
        }

        prev = curr;                              

    });

EventArgs is just a stand-in for the type of your event's argument.

Gurdas Nijor
Thanks for the suggestion. I do however need the previous element before subscription because I would like to filter the sequence based on logic using the previous and current.
dcstraw
Got it; i'll do a bit more research
Gurdas Nijor
+2  A: 

It turns out you can use a variable to hold the previous value and refer to it and reassign it within the chain of IObservable extensions. This even works within a helper method. With the code below I can now call CombineWithPrevious() on my IObservable to get a reference to the previous value, without re-evaluating the sequence.

public class ItemWithPrevious<T>
{
    public T Previous;
    public T Current;
}

public static class MyExtensions
{
    public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
    {
        var previous = default(T);

        return source
            .Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
            .Do(items => previous = items.Current);
    }
}
dcstraw
Interesting, not immediately obvious that this works!
sixlettervariables
+2  A: 

Evaluating twice is an indicator of a Cold observable. You can turn it to a Hot one by using .Publish():

var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();
Sergey Aldoukhov
Found an easy to digest article about the difference between Hot and Cold Observables: http://blogs.microsoft.co.il/blogs/bnaya/archive/2010/03/13/rx-for-beginners-part-9-hot-vs-cold-observable.aspx
Gurdas Nijor
Using Publish and Connect I still see the side effects of the Zip(Skip(1), ...) occur twice. I suppose it's not really the original source I needed to evaluate once, but the IObservable query itself. Still, thanks for the info.
dcstraw
@dcstraw Strange, I published the debugSequence from your example above (I used Observable.Range for the sequence) and it evaluated once for me. Using .Publish in the described situation is pretty much a no-brainer in the RX land...
Sergey Aldoukhov
A: 

See this link for an example of how to consume a sequence as pairs (prev, curr) of items: http://stackoverflow.com/questions/2768834/how-to-zip-one-ienumerable-with-itself

wageoghe
Some good info there but it's specific to IEnumerable rather than IObservable.
dcstraw