views:

63

answers:

2

This below example was my attempt at doing this:

var source
    = Observable.Sample(
          Observable.Range(1, int.MaxValue), TimeSpan.FromSeconds(2));

But when I .Subscribe() to that Observable and output it to the console, it shows a sequence like this, one line output every 2 seconds:

OnNext: 312969
OnNext: 584486
OnNext: 862009

Obviously the .Range() observable is running while the .Sample() observable is waiting 2 seconds between each output. I would like to know how to create an observable but that does not allow values to be skipped, so obviously that would look like this:

OnNext: 1
OnNext: 2
OnNext: 3

With one value from .Range() output every 2 seconds. How can I accomplish this in the Reactive Extensions for .NET?

+1  A: 

I approached this recently by creating an Observable that emits timed events every timeInterval. You can then use the Zip method to sychronize the events from your Observable with those of the timer Observable.

For instance:

    var timer = 
        Observable
            .Timer(
                TimeSpan.FromSeconds(0), 
                TimeSpan.FromSeconds(2)
            );
    var source = Observable.Range(1, int.MaxValue);
    var timedSource = source.Zip(timer,(s,t)=>s);
    timedSource.Subscribe(Console.WriteLine);
spender
Works well, thank you. I wrapped it up as my own extension method called Pace(..)
ZeroBugBounce
+1  A: 

Using Observable.GenerateWithTime:

var source = Observable.GenerateWithTime(1, _ => true, x => ++x, x => x, x => TimeSpan.FromSeconds(2));

Observable.Range uses Observable.Generate, so this is one approach. There could be many other ways.

For something more advanced, like dealing with events in the same manner (because this will obviously only help if you are generating the data yourself), see http://stackoverflow.com/questions/3211134/how-to-throttle-event-stream-using-rx which deals with this problem and has been solved.

Richard Hein
Thank you, this is also an interesting method to know.
ZeroBugBounce