views:

297

answers:

2

I want to put Reactive Extensions for .NET (Rx) to good use and would like to get some input on doing some basic tasks. To illustrate what I'm trying to do I have a contrived example where I have an external component with asyncronous events:

class Component {

  public void BeginStart() { ... }

  public event EventHandler Started;

}

The component is started by calling BeginStart(). This method returns immediately, and later, when the component has completed startup, the Started event fires.

I want to create a synchronous start method by wrapping the component and wait until the Started event is fired. This is what I've come up with so far:

class ComponentWrapper {

  readonly Component component = new Component();

  void StartComponent() {
    var componentStarted =
      Observable.FromEvent<EventArgs>(this.component, "Started");
    using (var startedEvent = new ManualResetEvent(false))
      using (componentStarted.Take(1).Subscribe(e => { startedEvent.Set(); })) {
        this.componenet.BeginStart();
        startedEvent.WaitOne();
      }
  }

}

I would like to get rid of the ManualResetEvent, and I expect that Rx has a solution. But how?

+2  A: 

Something like this should do it:

var replay = Observable
    .FromEvent<EventArgs>(this.component, "Started")
    .Replay();
replay.Connect();
component.BeginStart();
replay.First();
PL
Not sure why you need .Skip(1)... Also, in this scenario Publish does not need a null argument.
Sergey Aldoukhov
You are absolutely right. When I was writing the response I was thinking about the fact that there's a chance that event might get fired before First() subscribes. In this case the code will block forever. The correct solution would be to use Replay() in this case. I'll update my response.
PL
+1  A: 

PL's answer if perfectly good for your spec, but I thought you might get better results by not fighting RX with .First() but embracing it with creating an observable to your component:

    public static IObservable<Unit> AsObservable(this Component component)
    {
        return Observable.Defer(() =>
        {
            component.BeginStart();
            return Observable
                .FromEvent<EventArgs>(component, "Started")
                .Select(_ => new Unit());
        });
    }

Then you could use it as blocking:

new Component().AsObservable().First();

Non - blocking:

new Component().AsObservable().Subscribe(_ => Console.WriteLine("Done"));

Hot:

var pub = new Component().AsObservable().Publish();
pub.Subscribe(_ => Console.WriteLine("Sub1"));
pub.Subscribe(_ => Console.WriteLine("Sub2"));
pub.Connect();  // started just once per two subscriptions

Composable:

new Component().AsObservable().Delay(TimeSpan.FromSeconds(1));

etc...

EDIT: For the case of multiple events that you have to wait on and collect information, the following variation could be used:

public static IObservable<EventArgs> AsObservable(this Component component)
{
    return Observable.Defer(() =>
    {
        component.BeginStart();
        return 
            Observable.FromEvent<EventArgs>(component, "Started1").Take(1)
                .Merge(
            Observable.FromEvent<EventArgs>(component, "Started2").Take(1))
                .Select(evt => evt.EventArgs);
    });
}

With this one, if you want to block till completion, you might use .AsObservable.Last().

Sergey Aldoukhov
I really like this approach. However, I'm having trouble applying this solution to my real code. In actuality my component has several events with arguments I have to collect and wait for. I do this by using `Observable.And` and it works fine with my own approach as well as PL's. However, your solution blocks forever in `First` when I wait for more than one event.
Martin Liversage