views:

67

answers:

2

Hi,

There must be somebody out there who solved that already. Imagine I have a class that raises periodically an event about the change of a value (e.g. PropertyChanged) That value is nothing else than amount of money.

Now, I would like to make use of Rx so that I get the sum of the increasement of that last 10mins. e.g. BufferWithTime doesn't help, as I always need the last 10mins.

Any ideas how I can do this?

tia Martin

+1  A: 

The solution below involves keeping the state of the relevant event data in the previous ten minutes in a list using Observable.Scan. State is maintained as a list of tuples with an int (money) and a DateTime as values.

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h);
var runningSums =
    events.Scan(new List<Tuple<int, DateTime>>(),
                (l, e) =>
                {
                    var now = DateTime.Now;
                    // Add last event data to list.
                    l.Add(Tuple.Create(e.EventArgs.Money, now));
                    // Return the correct part of the list (everything
                    // from the last ten minutes).
                    return l.Where(t => (now - t.Item2) <
                                   TimeSpan.FromMinutes(10)).ToList();
                 })
          .Select(l => l.Sum(t => t.Item1));
runningSums.Subscribe(sum => Console.WriteLine(sum));

EDIT: Example that doesn't return a new list for every event:

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h);
var runningSums =
    events.Scan(Tuple.Create(new List<Tuple<int, DateTime>>(),
                             DateTime.Now - TimeSpan.FromMinutes(10)),
                (l, e) =>
                {
                    var now = DateTime.Now;
                    l.Item1.Add(Tuple.Create(e.EventArgs.Nr, now));
                    // if (trimming-condition) then trim front of list...
                    return Tuple.Create(l.Item1, now - TimeSpan.FromMinutes(10));
                })
          .Select(l => l.Item1.Where(t => t.Item2 > l.Item2).Sum(t => t.Item1));
runningSums.Subscribe(sum => Console.WriteLine(sum));
Ronald Wildenberg
not bad for sure and does what it should. BUT you might want to investigate whether it is really a good idea to return a new list with EVERY new element processed.
pointernil
I'm not entirely happy with this also. But, you must have a list of values with timestamps to be able to do this. And despite of the mutable list, this is an entirely functional approach. What you could also do is not to create a new list each time but keep the same list and make the start time (always 10 minutes in the past) part of the state for the `Scan` method. However, now the list only keeps growing so you'd have to find a way to trim the front. I added an example for this also (without the trimming, but this can be easily added).
Ronald Wildenberg
+1  A: 

Well, check out the following solution. It builds on previously presented solution here, but drops the pure functional style for the sake of efficiency (and readability, i think). It reuses aswell the built in type Timestamped to track the timing...

cheers

    public static class RxEntentsions
        {
            class TimeLimitedList<T>
            {
                public List<Timestamped<T>> Values = new List<Timestamped<T>>();
                TimeSpan span;
                public TimeLimitedList(TimeSpan sp) { span = sp; }
                public void Add(Timestamped<T> v)
                {
                    Values.Add(v);
                    Values.RemoveAll(a => a.Timestamp < (DateTime.Now - span));
                }
            }

            public static IObservable<List<Timestamped<TSource>>> SlidingWindow<TSource>(this IObservable<Timestamped<TSource>> source, TimeSpan slidingWindow)
            {
                return source.Scan0(new TimeLimitedList<TSource>(slidingWindow), (acc, v) => { acc.Add(v); return acc; }).Select(a => a.Values);
            }
        }


    static void Main(string[] args)
    {
        var gen = Observable.Interval(TimeSpan.FromSeconds(0.25d)).Timestamp();
        gen.SlidingWindow(TimeSpan.FromSeconds(1)).Subscribe(slw => {slw.ForEach(e=> Console.WriteLine(e)); Console.WriteLine("--------");});
        Console.ReadLine();
    }
pointernil