views:

308

answers:

2

I'm trying to get my head around the right use cases for Reactive Extensions (Rx). The examples that keeps coming up are UI events (drag and drop, drawing), and suggestions that Rx is suitable for asynchronous applications/operations such as web service calls.

I'm working on an application where I need to write a tiny client API for a REST service. I need to call four REST end-points, three to get some reference data (Airports, Airlines, and Statuses), and the fourth is the main service that will give you flight times for a given airport.

I have created classes exposing the three reference data services, and the methods look something like this:

public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)

In my GetFlights method I want each Flight to hold a reference the Airport it's departing from, and the Airline operating the flight. To do that I need the data from GetAirports and GetAirlines to be available. Each Airport, Airline and Status will be added to a Dictionar (ie.e Dictionary) so that I can easily set the reference when parsing each flight.

flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]

My current implementation now looks like this:

public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
    var airports = new AirportNamesService().GetAirports();
    var airlines = new AirlineNamesService().GetAirlines();
    var statuses = new StatusService().GetStautses();


    var referenceData = airports
        .ForkJoin(airlines, (allAirports, allAirlines) =>
                            {
                                Airports.AddRange(allAirports);
                                Airlines.AddRange(allAirlines);
                                return new Unit();
                            })
        .ForkJoin(statuses, (nothing, allStatuses) =>
                            {
                                Statuses.AddRange(allStatuses);
                                return new Unit();
                            });

    string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);

    var flights = from data in referenceData
                    from flight in GetFlightsFrom(url)
                    select flight;

    return flights;
}

private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
    return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}

The current implementation is based on Sergey's answer, and uses ForkJoin to ensure sequential execution and that I reference data gets loaded before Flights. This implementation is allot more elegant than having to fire a "ReferenceDataLoaded" event like my previous implementation.

+2  A: 

I think, if you are receiving a list of entities from each REST call, your call should have a little different signature - you are not observing each value in the return collection, you are observing the event of the call completion. So for airports, it should have the signature:

public IObservable<Aiports> GetAirports()

The next step would be to run first three in parallel and wait on all of them:

var ports_lines_statuses = 
    Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());

The third step woul be to compose the above abservable with the GetFlights():

var decoratedFlights = 
  from pls in ports_lines_statuses
  let airport = MyAirportFunc(pls)
  from flight in GetFlights(airport)
  select flight;

EDIT: I still do not understand why your services return

IObservable<Airport> 

instead of

IObservable<IEnumerable<Airport>>

AFAIK, from the REST call you get all entities at once - but maybe you do paging? Anyway, if you want RX do the buffering you could use .BufferWithCount() :

    var allAirports = new AirportNamesService()
        .GetAirports().BufferWithCount(int.MaxValue); 
...

Then you can apply ForkJoin:

var ports_lines_statuses =  
    allAirports
        .ForkJoin(allAirlines, PortsLinesSelector)
        .ForkJoin(statuses, ...

ports_lines_statuses would contain a single event on the timeline which would contain all the reference data.

EDIT: Here's another one, using the freshly minted ListObservable (latest release only):

allAiports = airports.Start(); 
allAirlines = airlines.Start();
allStatuses = statuses.Start();

...
whenReferenceDataLoaded =
  Observable.Join(airports.WhenCompleted()
                 .And(airlines.WhenCompleted())
                 .And(statuses.WhenCompleted())
                 Then((p, l, s) => new Unit())); 



    public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
    {
        return source
            .Materialize()
            .Where(n => n.Kind == NotificationKind.OnCompleted)
            .Select(_ => new Unit());
    }
Sergey Aldoukhov
I actually do want to get all Airlines, Airports and Statuses in "one batch" first, because when I get Flights I need those three reference data sets to be present so that I can link them to a Flight. So I need to get Airports into a dict like this Dictionart<string, Airport>, so that I can do: flight.Airport = airports[flightXml.AirportCode].
Jonas Follesø
I updated the question with new method signatures. You where right, I actually do want to get all Airports, Airlines and Statuses at once. Am I correct in PortLinesSelector is a method combining airports and airlines, and then I need a second method to combine the previous result with then ew result?I tried downloading the latest release of RX for Silverlight 3/4, but couldn't find method Start() on Observable (only start with).
Jonas Follesø
@jonas-folleso Yes, PortsLinesSelector is something like (ports, lines) => new { ports, lines }, and the second selector would attach the third result to this one. The idea here is to try to keep to the functional style as much as possible and just pass the data through the pipeline instead of using the local variables.Start() on observable is only on .Net4 release, so you would have to wait till it is ported to others...
Sergey Aldoukhov
Okey, cool. You definitely pointed me in the right direction, and as I got something that works pretty well I'll mark this one as answered. Thanks allot!
Jonas Follesø
A: 

The use case here is pull based - IEnumerable is fine. If you want to say, notify where a new flight comes in, then wrapping a pull based REST call within Observable.Generate might be of some value.

Scott Weinstein
So Rx is not a good approach to build a REST client in my scenario? Since this is WP7 I cannot make it synchronous, so the alternative would be do: GetAirlinesAsync, and have a GetAirlinesCompleted event. I then would have to call GetAirlinesAsync, GetAirportsAsync and GetStatusesAsync, and wait for all three callback events to fire before calling GetFlights..? I was also planning to extend my method to make it re-call GetFlights service every 3min to refresh. So hence observing new flight objects as the arrive sounds like a good idea..?
Jonas Follesø
If the underlying API is only async based, then RX makes a lot more sense. Observable.Generate...
Scott Weinstein