views:

261

answers:

4

Given the benefits of composable events as offered by the Reactive Extensions (Rx) framework, I'm wondering whether my classes should stop pushing .NET events, and instead expose Rx observables.

For instance, take the following class using standard .NET events:

public class Foo
{
   private int progress;
   public event EventHandler ProgressChanged;

   public int Progress
   {
      get { return this.progress; }
      set
      {
         if (this.progress != value)
         {
            this.progress = value;

            // Raise the event while checking for no subscribers and preventing unsubscription race condition.
            var progressChanged = this.ProgressChanged;
            if (progressChanged != null)
            {
                progressChanged(this, new EventArgs());
            }
         }
      }
   }
}

Lot of monotonous plumbing.

This class could instead use some sort of observable to replace this functionality:

public class Foo
{
   public Foo()
   {
       this.Progress = some new observable;
   }

   public IObservable<int> Progress { get; private set; }
}

Far less plumbing. Intention is no longer obscured by plumbing details. This seems beneficial.

My questions for you fine StackOverflow folks are:

  1. Would it good/worthwhile to replace standard .NET events with IObservable<T> values?
  2. If I were to use an observable, what kind would I use here? Obviously I need to push values to it (e.g. Progress.UpdateValue(...) or something).
A: 

Apart from the fact that your existing eventing code could be terser:

    public event EventHandler ProgressChanged = delegate {};

    ...
       set {
          ... 
          // no need for null check anymore       
          ProgressChanged(this, new EventArgs());
   }

I think by switching to Observable<int> you are just moving complexity from the callee to the caller. What if I just want to read the int?

-Oisin

x0n
That has the downside of creating an extra object on the heap, as well as raising ProgressChanged even if progress really didn't change. Also, I lose composability using standard events. This actually puts a great burden on the caller, e.g. subscribe to just one event? Caller has to keep track of the event and unsubscribe after invocation. Handler goes out of scope? Caller has to make sure to unhook the event or we get memory leak. Want to compose 2 events together like MouseDown and MouseMove for drag/drop? Caller has to coordinate that whole mess. Composability is good. Rx makes it easy.
Judah Himango
To read the int, I'd be providing some custom observable such that you could say Progress.Value or something.
Judah Himango
@Judah - I suppose it depends on your audience and the more importantly, the overall purpose of your class and the needs of its consumers. Btw, your original code also has a race condition where it checks if the delegate is null; you should take a copy of ProgressChanged in a local and test that for null. I didn't bother pointing it out but if you insist on talking about the heap etc, I figure I should bring it up ;-)
x0n
I'm aware of that race condition, see my workaround here: http://stackoverflow.com/questions/248072/evil-use-of-extension-methods Nonetheless, a proper check for that race condition only adds yet more humdrum plumbing. Yuck! I'm liking Rx more and more. I just would like to hear the RX community's take on this.
Judah Himango
@Judah - sure. I agree, the race condition only bolsters your case. I'm sure the RX community will say "yeah, go for it," but like I said, Observables are more complex to work with than simple events, so your classes' consumers are probably more important to ask.
x0n
*Observables are more complex to work with than simple events* I'm not so sure: foo.ProgressChanged += SomeHandler, versus foo.ProgressChanged.Subscribe(SomeHandler). I would argue that when it comes to composition of events -- subscribe just once, subscribe on a sync context, compose multiple events, compose your own events (e.g. Subscribe when Progress == 100), it actually makes the consumer's life much easier. Don't you agree? If not, no prob, I'm not an RX evangelist, just speaking from my own experience.
Judah Himango
By the way, I've updated the post to include a check for the unsubscription race condition.
Judah Himango
Do you *really* want to guard against race conditions by caching the delegate? If the event has been unhooked, you risk that the cached delegate will be accessing resources that were disposed by the code that unhooked the event?
snemarch
@snemarch - far less likely than the null reference exception that will be thrown should the event be unsubscribed from before you invoke it. Either way, problems.
x0n
+4  A: 

For #2, the most straightforward way is via a Subject:

Subject<int> _Progress;
IObservable<int> Progress {
    get { return _Progress; }
}

private void setProgress(int new_value) {
    _Progress.OnNext(new_value);
}

private void wereDoneWithWorking() {
    _Progress.OnCompleted();
}

private void somethingBadHappened(Exception ex) {
    _Progress.OnError(ex);
}

With this, now your "Progress" can not only notify when the progress has changed, but when the operation has completed, and whether it was successful. Keep in mind though, that once an IObservable has completed either via OnCompleted or OnError, it's "dead" - you can't post anything further to it.

Paul Betts
Interesting. I'm unfamiliar with Subject. I see it's what I need as far as being able to update it. Still, I don't like the foo.SetProgress(42) call. I'd rather do something like foo.Progress.Value = 42; Maybe a custom Observable is what I really need here.
Judah Himango
I guess my objection to using subject boils down to putting more work in the callee -- now I have to provide a separate setter method for each observable in my class. I'm definitely leaning towards a custom observer now.
Judah Himango
You don't *have* to use a separate setter, I just wanted to make it more clear - if you want your class to directly fiddle with the Subject, you can do that too
Paul Betts
Paul, just saw your MVVM + RX framework on the RX forums. Very interesting indeed. My reason for asking this whole question is for use in view models in Silverlight. Maybe I should take a hard look at your framework.
Judah Himango
@Judah Please do! In RxXaml, your scenario is handled by ObservableAsPropertyHelper (http://bit.ly/az3VeL for the blog entry on it). Check out my blog posts about it at http://bit.ly/rxxamlblog and the code is at http://bit.ly/reactivexaml - if you run into problems or it doesn't work for you for any reason, please Email me and tell me about it!
Paul Betts
Alright. I'll have a look tonight when I get home from work. Thanks!
Judah Himango
Adding .AsObservable(); to your Progress property's getter: return _Progress.AsObservable();. Will prevent consumers from casting the returned IObservable < int > as Subject< int > and then being able to call OnNext, OnError or OnCompleted on your internal Subject< int >. A short discussion can be found [here](http://social.msdn.microsoft.com/Forums/en/rx/thread/f8728ec3-ab0c-4d8e-849e-c419c87a42a7).
Rusty
I'm not particularly worried about that - it's like worrying that people will use Reflection to access private fields, I don't think clients of my code are actively out to get me
Paul Betts
Yeah, I'm with Paul on this one. I do this all the time with exposing mutable collections as IEnumerable<t>.
Judah Himango
+1  A: 

Ok guys, seeing as how I think it's at least worth a shot to try this, and seeing as how RX's Subject<T> isn't quite what I'm looking for, I've created a new observable that fits my needs:

  • Implements IObservable<T>
  • Implements INotifyPropertyChange to work with WPF/Silverlight binding.
  • Provides easy get/set semantics.

I call the class Observable<T>.

Declaration:

/// <summary>
/// Represents a value whose changes can be observed.
/// </summary>
/// <typeparam name="T">The type of value.</typeparam>
public class Observable<T> : IObservable<T>, INotifyPropertyChanged
{
    private T value;
    private readonly List<AnonymousObserver> observers = new List<AnonymousObserver>(2);
    private PropertyChangedEventHandler propertyChanged;

    /// <summary>
    /// Constructs a new observable with a default value.
    /// </summary>
    public Observable()
    {
    }

    public Observable(T initalValue)
    {
        this.value = initialValue;
    }

    /// <summary>
    /// Gets the underlying value of the observable.
    /// </summary>
    public T Value
    {
        get { return this.value; }
        set
        {
            var valueHasChanged = !EqualityComparer<T>.Default.Equals(this.value, value);
            this.value = value;

            // Notify the observers of the value.
            this.observers
                .Select(o => o.Observer)
                .Where(o => o != null)
                .Do(o => o.OnNext(value))
                .Run();

            // For INotifyPropertyChange support, useful in WPF and Silverlight.
            if (valueHasChanged && propertyChanged != null)
            {
               propertyChanged(this, new PropertyChangedEventArgs("Value"));
            }
        }
    }

    /// <summary>
    /// Converts the observable to a string. If the value isn't null, this will return
    /// the value string.
    /// </summary>
    /// <returns>The value .ToString'd, or the default string value of the observable class.</returns>
    public override string ToString()
    {
        return value != null ? value.ToString() : "Observable<" + typeof(T).Name + "> with null value.";
    }

    /// <summary>
    /// Implicitly converts an Observable to its underlying value.
    /// </summary>
    /// <param name="input">The observable.</param>
    /// <returns>The observable's value.</returns>
    public static implicit operator T(Observable<T> input)
    {
        return input.Value;
    }

    /// <summary>
    /// Subscribes to changes in the observable.
    /// </summary>
    /// <param name="observer">The subscriber.</param>
    /// <returns>A disposable object. When disposed, the observer will stop receiving events.</returns>
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var disposableObserver = new AnonymousObserver(observer);
        this.observers.Add(disposableObserver);
        return disposableObserver;
    }

    event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged
    {
        add { this.propertyChanged += value; }
        remove { this.propertyChanged -= value; }
    }

    class AnonymousObserver : IDisposable
    {
        public IObserver<T> Observer { get; private set; }

        public AnonymousObserver(IObserver<T> observer)
        {
            this.Observer = observer;
        }

        public void Dispose()
        {
            this.Observer = null;
        }
    }
}

Usage:

Consuming is nice and easy. No plumbing!

public class Foo
{
    public Foo()
    {
        Progress = new Observable<T>();
    } 

    public Observable<T> Progress { get; private set; }
}

Usage is simple:

// Getting the value works just like normal, thanks to implicit conversion.
int someValue = foo.Progress;

// Setting the value is easy, too:
foo.Progress.Value = 42;

You can databind to it in WPF or Silverlight, just bind to the Value property.

<ProgressBar Value={Binding Progress.Value} />

Most importantly, you can compose, filter, project, and do all the sexy things RX lets you do with IObservables:

Filtering events:

foo.Progress
   .Where(val => val == 100)
   .Subscribe(_ => MyProgressFinishedHandler());

Automatic unsubscribe after N invocations:

foo.Progress
   .Take(1)
   .Subscribe(_ => OnProgressChangedOnce());

Composing events:

// Pretend we have an IObservable<bool> called IsClosed:
foo.Progress
   .TakeUntil(IsClosed.Where(v => v == true))
   .Subscribe(_ => ProgressChangedWithWindowOpened());

Nifty stuff!

Judah Himango
I wouldn't recommend managing subjects yourself due to threading issues, etc. See my answer
Richard Szalay
+1  A: 

I don't recommend managing your own subscriber list when there are built in subjects that can do that for you. It also removes the need for carrying your own mutable copy of T.

Below is my (commentless) version of your solution:

public class Observable<T> : IObservable<T>, INotifyPropertyChanged 
{ 
    private readonly BehaviorSubject<T> values; 

    private PropertyChangedEventHandler propertyChanged; 

    public Observable() : this(default(T))
    {
    } 

    public Observable(T initalValue) 
    { 
        this.values = new BehaviorSubject<T>(initalValue);

        values.DistinctUntilChanged().Subscribe(FirePropertyChanged);
    }

    public T Value 
    { 
        get { return this.values.First(); } 
        set { values.OnNext(value); } 
    }

    private void FirePropertyChanged(T value)
    {
        var handler = this.propertyChanged;

        if (handler != null)
            handler(this, new PropertyChangedEventArgs("Value"));
    }

    public override string ToString() 
    { 
        return value != null ? value.ToString() : "Observable<" + typeof(T).Name + "> with null value."; 
    } 

    public static implicit operator T(Observable<T> input) 
    { 
        return input.Value; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
        return values.Subscribe(observer);
    } 

    event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged 
    { 
        add { this.propertyChanged += value; } 
        remove { this.propertyChanged -= value; } 
    } 
}
Richard Szalay
Nifty! Thank you, Richard.
Judah Himango
Updated your code to remove the line "this.value = x" inside the DistinctUntilChanged subscription, as there is no value field.
Judah Himango