views:

176

answers:

3

Update

Looks like Jon Skeet was right (big surprise!) and the issue was with my assumption about the Average extension providing a continuous average (it doesn't).

For the behavior I'm after, I wrote a simple ContinuousAverage extension method, the implementation of which I am including here for the benefit of others who may want something similar:

public static class ObservableExtensions {
    private class ContinuousAverager {
            private double _mean;
            private long _count;

        public ContinuousAverager() {
            _mean = 0.0;
            _count = 0L;
        }

        // undecided whether this method needs to be made thread-safe or not
        // seems that ought to be the responsibility of the IObservable (?)
        public double Add(double value) {
            double delta = value - _mean;
            _mean += (delta / (double)(++_count));
            return _mean;
        }
    }

    public static IObservable<double> ContinousAverage(this IObservable<double> source) {
        var averager = new ContinuousAverager();

        return source.Select(x => averager.Add(x));
    }
}

I'm thinking of going ahead and doing something like the above for the other obvious candidates as well -- so, ContinuousCount, ContinuousSum, ContinuousMin, ContinuousMax ... perhaps ContinuousVariance and ContinuousStandardDeviation as well? Any thoughts on that?


Original Question

I use Rx Extensions a little bit here and there, and feel I've got the basic ideas down.

Now here's something odd: I was under the impression that if I wrote this:

var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

var bids = ticks
    .Where(e => e.EventArgs.Quote.HasBid)
    .Select(e => e.EventArgs.Quote.Bid);

var bidsSubscription = bids.Subscribe(
    b => Console.WriteLine("Bid: {0}", b)
);

var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

I would get two IObservable<double> objects (bids and avgOfBids); one would basically be a stream of all the market bids from my MarketDataProvider, the other would be a stream of the average of these bids.

So something like this:

Bid    Avg Bid
1      1
2      1.5
1      1.33
2      1.5

It seems that my avgOfBids object isn't doing anything. What am I missing? I think I've probably misunderstood what Average is actually supposed to do. (This also seems to be the case for all of the aggregate-like extension methods on IObservable<T> -- e.g., Max, Count, etc.)

+1  A: 

After subscribing to the bids event stream; you've effectively blocked any other observers from subscribing (Subscribe returns an IDisposable.)

You'll have to define another Observable in parallel with ticks, and subscribe to that one to average it.

var ticksToAverage = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

 var bidsToAverage = ticksToAverage
      .Where(e => e.EventArgs.Quote.HasBid)
         .Select(e => e.EventArgs.Quote.Bid);


var avgOfBids = bidsToAverage.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe( b => Console.WriteLine("Avg Bid: {0}", b)
                ); 
Pierreten
Why would subscribing block? I'm willing to admit that Rx regularly confuses me, but I can't see why that would be the case. I believe the problem is entirely different here.
Jon Skeet
I was also under the impression that Average provides a running average; based on your answer that seems to be wrong. As far as blocking, you could very well be correct; since Rx uses regular CLR events under the hood; and multiple parties can subscribe to an event without blocking.
Pierreten
@Pierreten: Looks like Jon's answer was the right one in this case. Thanks for your input, though; I appreciate it!
Dan Tao
@Pierreten - FYI, the only part of Rx that uses CLR is `Observable.FromEvent`
Richard Szalay
+2  A: 

The aggregate methods don't provide rolling max/min/count/average etc - they provide a single value when the original stream completes. So for example, if you change your code to:

var avgOfBids = bids.Take(5).Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

Then once 5 bids have been made, it will display the average of them. The "average" stream will then complete too.

Jon Skeet
Jon, do you know of a decent source of documentation for Rx? I've looked through MSDN and the Rx team's blog to no avail.
Pierreten
@Pierreten: Nope. I use the CHM files which come with it, and the forum for questions.
Jon Skeet
@Jon: Thanks a lot! I suppose that does make sense; I am with Pierreten in wishing MSDN had more documentation on these methods, though. In any case, I was able to get the behavior I wanted by writing a simple implementation myself. Do you happen to know if something like what I've written (in my updated answer) already exists somewhere? A continuous analytics library, of sorts?
Dan Tao
@Dan: I believe you could get away with just using Scan and Select if you were cunning... make Scan return a tuple of (count, total) and then select total / count.
Jon Skeet
+2  A: 

Another way to do ContinousAverage would be to use a .Scan():

bids.Scan(new { sum = .0, count = 0 }, 
          (agg, x) => new { sum = agg.sum + x, count = agg.count + 1 })
    .Select(agg => agg.sum / agg.count)
Sergey Aldoukhov
Yep, this is indeed a clever way to get it. Jon Skeet suggested the same thing in one of his comments. I'm still thinking it'd be handy to have a `ContinuousStandardDeviation` method, though, or perhaps something like `ContinuousStatistics` that returns an `IObservable<Statistics>` where `Statistics` is a `struct` including mean, variance, standard deviation, etc.
Dan Tao
@Dan: Take a look at this implementation: http://weblogs.asp.net/sweinstein/archive/2010/01/02/streaming-olap-with-the-reactive-extensions-rx-for-net-part-1.aspx
Pierreten