views:

1630

answers:

4

The Reactive Extensions come with a lot of helper methods for turning existing events and asynchronous operations into observables but how would you implement an IObservable<T> from scratch?

IEnumerable has the lovely yield keyword to make it very simple to implement.

What is the proper way of implementing IObservable<T>?

Do I need to worry about thread safety?

I know there is support for getting called back on a specific synchronization context but is this something I as an IObservable<T> author need to worry about or this somehow built-in?

update:

Here's my C# version of Brian's F# solution

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}
+1  A: 
  1. Crack open Reflector and have a look.

  2. Watch some C9 videos - this one shows how you can 'derive' the Select 'combinator'

  3. The secret is to create AnonymousObservable, AnonymousObserver and AnonymousDisposable classes, (which are just work arounds for the fact that you can't instantiate interfaces). They contain zero implementation, as you pass that in with Actions and Funcs.

For example:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

I'll let you work out the rest... it's a very good exercise in understanding.

There's a nice little thread growing here with related questions.

Benjol
Thanks, but not really that helpful. I have been through both reflector and most of C9's videos already. Reflector only shows the actual implementation and it's very hard to deduce rules concerning threading and such from it.Also your so-called secret just pushes the responsibility of a correct implementation from the actual observable class to the supplied Func - it doesn't reveal the rules for implementing that Func. So basically you told me nothing except to figure out the rest by myself :)
jesperll
Point taken. To be honest, most of my efforts so far have been trying to write what they call 'combinators' As opposed to actual sources. You can glean a few guidelines from the answers to my question here (best place for getting 'official' answers at the moment): http://social.msdn.microsoft.com/Forums/en-US/rx/thread/79402dd3-009a-46db-9b55-06482e8cad0e
Benjol
+3  A: 

Honestly, I am not sure how 'right' all this is, but if feels pretty good based on my experience so far. It's F# code, but hopefully you get a sense of the flavor. It lets you 'new up' a source object, which you can then call Next/Completed/Error on, and it manages subscriptions and tries to Assert when the source or clients do bad things.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

I'll be interested in any thoughts about what's good or bad here; I haven't had a chance to look at all the new Rx stuff from devlabs yet...

My own experiences suggest that:

  • Those who subscribe to observables should never throw from the subscriptions. There is nothing reasonable an observable can do when a subscriber throws. (This is similar to events.) Most likely the exception will just bubble up to a top-level catch-all handler or crash the app.
  • Sources probably should be "logically single threaded". I think it may be harder to write clients that can react to concurrent OnNext calls; even if each individual call comes from a different thread, it is helpful to avoid concurrent calls.
  • It's definitely useful to have a base/helper class that enforces some 'contracts'.

I'm very curious if people can show more concrete advice along these lines.

Brian
Thanks, I had a crack at creating something similar in C# and ended up using the F# Map collection to avoid locking during enumeration. Another option is to use something like Eric Lippert's Immutable AVLTree.I have convinced myself that it's the observer's responsibility to ensure that events are received in the proper context and the observable should just stick to raising events on the same thread each time (as you write).
jesperll
A: 

just one remark regarding this implementation :

after concurrent collections being introduced in .net fw 4 it is probably better to use ConcurrentDictioary instead of a simple dictionary.

it saves handling locks on the collection.

adi.

Adiel
A: 

Yes, the yield keyword is lovely; maybe there will be something similar for IObservable(OfT)? [edit: In Eric Meijer's PDC '09 talk he says "yes, watch this space" to a declarative yield for generating observables.]

For something close (instead of rolling your own), check out the bottom of the "(not yet) 101 Rx Samples" wiki, where the team suggests use of the Subject(T) class as a "backend" to implement an IObservable(OfT). Here's their example:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
David Cuccia