views:

69

answers:

3

Yesterday I watched the screencast Writing your first Rx Application (on Channel 9) where Wes Dyer shows how to implement Drag 'n' Drop using Reactive Extensions (Rx). Something that I still don't understand:

Towards the end of the screencast, Wes Dyer types in the following:

var q = from start in mouseDown
        from delta in mouseMove.StartsWith(start).Until(mouseUp)
                       .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                           new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
        select delta;

Briefly, q is an observable that pushes the mouse move coordinate deltas to its subscribers.

What I don't understand is how the mm.Zip(mm.Skip(1), ...) can possibly work!?

As far as I know, IObservable is not enumerable in the sense that IEnumerable is. Thanks to the "pull" nature of IEnumerable, it can be iterated over again and again, always yielding the same items. (At least this should be the case for all well-behaved enumerables.) IObservable works differently. Items are pushed to the subscribers once, and that was it. In the above example, mouse moves are single incidents which cannot be repeated without having been recorded in-memory.

So, how can the combination of .Zip with .Skip(1) possibly work, since the mouse events they're working on are single, non-repeatable incidents? Doesn't this operation require that mm is "looked at" independently twice?


For reference, here's the method signature of Observable.Zip:

public static IObservable<TResult> Zip <TLeft, TRight, TResult>
(
    this IObservable<TLeft>       leftSource,     //  = mm
    IObservable<TRight>           rightSource,    //  = mm.Skip(1)
    Func<TLeft, TRight, TResult>  selector
)

P.S.: I just saw that there's another screencast on the Zip operator which is quite insightful.

+1  A: 

Items are pushed to the subscribers once, and that was it.

Yes, one item is pushed once, but the item is one of a 'sequence' of events. The sequence is still a sequence. That's why Skip works - it skips one item and then, when the next one comes, processes it (doesn't skip it).

Alex Paven
+2  A: 

Aha! The Zip screencast that I mentioned in the P.S. gave me a vital clue: Zip "remembers" items to account for the fact that items may arrive from one observable sooner than from the other. I'll attempt an answer to my question, I hope someone can correct me if I'm wrong.

Zip pairs up inputs from two observable sequences like this (letters and digits are "events"):

mm                        ----A---------B-------C------D-----------E----->
                              |         |       |      |           |
                              |         |       |      |           |
mm.Skip(1)                ----+---------1-------2------3-----------4----->
                              |         |       |      |           |
                              |         |       |      |           |
mm.Zip(mm.Skip(1), ...)   ----+--------A,1-----B,2----C,3---------D,4---->

And it indeed has to do internal buffering. In the code that I posted, mm is the real, "live" observable. mm.Skip(1) is something like a state machine derived from it. Alex Paven's answer briefly explains how this works.

So, mm.Zip(mm.Skip(1), ...) does indeed look at mm twice, once directly, and once through the Skip(n) filter. And because observables aren't repeatable sequences, it does internal buffering to account for the fact that one sequence will yield items sooner than the other.

(I quickly glanced at the Rx source with .NET Reflector and indeed, Zip involves a Queue.)

stakx
See also the marble diagram here: http://wiki.github.com/richardszalay/rxas/zip
Richard Szalay
+2  A: 

Doesn't this operation require that mm is "looked at" independently twice?

Thats in fact the answer of your question: You can subscribe to the same IObservable sequence multiple times.

The mm.Skip(1) subscribes to mm and hides the first value to its own subscribers. Zip is a subscriber of both mm.Skip(1) and mm. Because mm yielded one more value than mm.Skip(1), Zip internally buffers the last mousemove event from mm all the time in order to zip it with the next future mousemove event. The selector function can then select the delta between both.

Another thing you should notice is (which is the real answer to the title of your question), that this Observable.FromEvent-IObservable is a hot observable and therefore not repeatable. But there are cold Observables which are in fact repeatable, like Observable.Range(0,10). In the latter case each subscriber will receive the same 10 events, because they are generated independently for each subscriber. For mousemove events this is not the case (you wont get mouse move events from the past). But because Zip subscribes to the right and left sequence at the same time its likely the same in this case.

P.S.: You can also crate a hot / not repeatable IEnumerable: It does not need to return the same values for each enumerator. You could for instance create an IEnumerable which waits until a mousemove event occurs an then yield the event. In this case the enumerator would always block (bad design), but it would be possible. ;)

Nappy
_@Nappy_, thanks for explaining that "hot" vs. "cold" bit of terminology. I was lacking the proper vocabulary when I asked the question, this makes things even clearer now.
stakx