views:

36

answers:

1

I have a typed message broker similar to what Caliburn provides:

public interface IMessageBroker
{
    void Publish<T>(T message);
    IDisposable Subscribe<T>(Action<T> subscriber);
}

How can I to convert subscriptions to IObservable?

I want an extension method, something like this:

public static IObservable<T> Subscribe<T>(this IMessageBroker messageBroker)
{
    var subject = new Subject<T>();
    messageBroker.Subscribe<T>(subject.OnNext);
    return subject;
}

problem in this implementation is that I can't unsubscribe and so it leaks.

Better name for Subscribe method is also welcomed.

+2  A: 

Try this (untested):

public static IObservable<T> ToObservable<T>(this IMessageBroker messageBroker)
{
    IObservable<T> observable = Observable.CreateWithDisposable<T>(o =>
        {
            return messageBroker.Subscribe<T>(o.OnNext);
        });
    return observable;
}

Which you should be able to use like this:

var observableBroker = messageBroker.ToObservable<int>();
var subject = new Subject<int>();
observableBroker.Subscribe(subject.OnNext);

//alternatively, there are overloads of Observerable.Subscribe which take lambdas:
observableBroker.Subscribe(t => DoSomethingWith(t));
Benjol
Am I missing something or is subject really not used anywhere?
Konstantin Spirin
@Konstantin, woops! You're right, corrected (but still not tested...)
Benjol
Your suggestion has been tested now and it is exactly what I wanted :)
Konstantin Spirin
@Konstantin, good :) Note that ToObservable (as a name) is also coherent with the rx extensions themselves.
Benjol