tags:

views:

48

answers:

2

I have an IObservable that produces values at random intervals and I want to throttle this sequence. One thing I have found out is that the Throttle operator's definition of "throttling" is not the same as mine.

Throttle only produces values after the specified interval elapses with silence (it produces the last value seen). I thought throttling would mean producing values at the specified interval (unless there's silence, of course).

Say, I expected Observable.Interval(100).Select((_,i) => i).Throttle(200) to produce (modulo any performance/timing issues) the even numbers, since I am throttling it to "half-speed". However that sequence produces no value at all, because there's never a period of silence of length 200.

So, I discovered that Sample actually does the "throttling" behavior I want. Observable.Interval(100).Select((_,i) => i).Sample(200) produces (again, modulo any performance/timing issues) the sequence of even numbers.

However, I have one other problem: the interval varies, depending on the last "sampled" value. What I want is to write an operator that looks like this:

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector);

The intervalSelector parameter produces the interval for the next sample, and the first sample... is either taken at the first value or from an additional parameter, I don't care.

I tried writing this but I ended up with a large convoluted construction that did not work quite right. My question is, can I build this using the existing operators (aka, with a one-liner)?

A: 

Isn't what you're looking for Observable.BufferWithTime?

Paul Betts
BufferWithTime suffers from the same defect as the others: the time interval is constant. I need to compute how long to wait to take the next sample from the last sampled value. I'll see if I can draw a marble diagram for this...
Martinho Fernandes
A: 

Many hours later, and with some sleep on it, I got it.

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector)
{
    return source.TimeInterval()
                 .Scan(Tuple.Create(TimeSpan.Zero, false, 0), (acc, v) =>
                 {
                     if(v.Interval >= acc.Item1)
                     {
                         return Tuple.Create(intervalSelector(s.Value), true, v.Value);
                     }
                     return Tuple.Create(acc.Item1 - v.Interval, false, v.Value);
                 })
                 .Where(t => t.Item2)
                 .Select(x => x.Item3);
}

This works as I want: each time it produces a value x, it stops producing values until intervalSelector(x) time passes.

Martinho Fernandes