views:

62

answers:

3

This might be a stupid question as I'm a bit new to RX :)

I'm sampling an event (RX for .Net 4.0):

eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().Subscribe(x =>Console.WriteLine("testing:" + x.Value.EventArgs.str));

The problem is that the sampling time needs to be able to change on the fly, I guess I could make some property that removes the existing handler and creates a new one when it changes, but it seems a bit messy and more vulnerable to timing issues. Is there a way to simply change the interval?

Example: Say that someone is writing a string of characters, when a certain sequence is detected you want to change the sampling time without missing an event, and preferably by not getting an event more than one time

A: 

I don't know of a way of changing the existing sampling interval, but what you could do is sample at the highest frequency you'll need, and then filter with a Where clause which uses a variable you can change.

For example:

static IObservable<T> SampleEvery<T>(this IObservable<T> source,
    Func<int> multipleProvider)
{
    int counter = 0;
    Func<T, bool> predicate = ignored => {
        counter++;
        if (counter >= multipleProvider())
        {
            counter = 0;
        }
        return counter == 0;
    };
    return source.Where(predicate);
}

You'd then call it like this:

// Keep this somewhere you can change it
int multiple = 1;

eventAsObservable.Sample(TimeSpan.FromSeconds(1))
                 .SampleEvery(() => multiple)
                 .Timestamp()
                 .Subscribe(x => Console.WriteLine("testing:" + 
                                                   x.Value.EventArgs.str));

Now, changing the value of multiple will change the effective sampling frequency.

It's a pretty ugly hack, but I think it should work.

Jon Skeet
Are you missing a () on "if (counter >= multipleProvider)"
Paul Betts
@Paul: Yes, oops. Fixing.
Jon Skeet
Seems like a viable solution, I'll give it some testing, thanks! I wonder what a good solution would be for scenarios like these... perhaps being able to send in a method/lambda that returns an timespan instead of the actual timespan. It doesnt seem that farfetched that you'd want to change parameters to different operators on the fly
MattiasK
A: 

Why don't you just subscribe twice?

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);

Or if searches are only active based on some sort of prefix or qualifier like Google Chrome's '?' operator:

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);
Paul Betts
The interval can be any value, it's customizeable on a per source basis. The best solution sofar is creating a new subscription and then disposing the old one, there's a small chance of the event being triggered twice though. It would be great if there was a way to access the actual property though
MattiasK
What I'm saying though is, that you can just keep all of them running, but toggle their output via a Where clause. Maybe I don't quite grok exactly what you're doing...
Paul Betts
+1  A: 

I know this question has already been answered, but I thought I'd add another few ways of tackling it in an Rx way.

You could use Switch on a sequence of TimeSpan's:

private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>();

sampleFrequencies
    .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp())
    .Switch()
    .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change:
// sampleFrequencies.OnNext(TimeSpan.FromSeconds(5));

Alternatively, it could also be solved using Defer, TakeUntil and Repeat (this one is a little crazier and is included as a thought exercise):

private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2);
private Subject<Unit> frequencyChanged = new Subject<Unit>();

(Observable
    .Defer(() => eventAsObservable
       .Sample(Observable.Interval(sampleFrequency)
    )
    .Timestamp()
    .TakeUntil(frequencyChanged)
).Repeat()
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change: 
// sampleFrequency = TimeSpan.FromSeconds(5);
// frequencyChanged.OnNext(new Unit());
Richard Szalay
Actually, I ended up simply doing a property that created a new subscription and then disposed of the old one. There's a small chance of it firing an event twice I think but I think the risk of introducing bugs/overhead with the other methods make it the most attractive solution. I might for instance want to change Sample to Throttle. The other solutions seemed a little too much of a hack, but I appreciate the help!
MattiasK
Both my solution still use Sample, so it could easily be changed to Throttle (the timestamp stuff was taken directly from your requirements). The Switch version does pretty much what you are doing now (cancelling, restarting), but from within Switch.
Richard Szalay