views:

276

answers:

1

I'm struggling with my first simple "hello world" RX application. I'm using VS2010 RC, plus the latest RX download.

The following is the simple console app;

    class Program
    {
        static void Main(string[] args)
        {

            var channel = new MessageChannel()
                .Where(m => m.process)
                .Subscribe((MyMessage m) => Console.WriteLine(m.subject));

            //channel.GenerateMsgs();
        }
    }

    public class MyMessage
    {
        public string subject;
        public bool process;
    }

    public class MessageChannel: IObservable<MyMessage>
    {
        List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>();

        public IDisposable Subscribe(IObserver<MyMessage> observer)
        {
            observers.Add(observer);
            return observer as IDisposable;
        }

        public void GenerateMsgs()
        {
            foreach (IObserver<MyMessage> observer in observers)
            {
                observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
            }
        }
    }

I get a ArgumentNullException at the Where clause. Here's the stack;

System.ArgumentNullException was unhandled
  Message=Value cannot be null.
Parameter name: disposable
  Source=System.Reactive
  ParamName=disposable
  StackTrace:
       at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable)
       at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
       at System.Threading.Scheduler.NowScheduler.Schedule(Action action)
       at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer)
       at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18
       at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
       at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
       at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
       at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
       at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
       at System.Threading.ThreadHelper.ThreadStart()
  InnerException: 
+1  A: 

This line seems to be causing the fuss:

return observer as IDisposable;

You are not supposed to assume the the observer is disposable, you are supposed to return a disposable object that knows about "unsubscribing".

The method returns a reference to an IDisposable interface. This enables the observer to unsubscribe (that is, to stop receiving notifications) before the provider has finished sending them and called the subscriber's OnCompleted method.

You can make it work by doing something like:

public class MessageChannel: IObservable<MyMessage>
{
    class Subscription : IDisposable {
        MessageChannel _c;
        IObservable<MyMessage> _obs;
        public Subscription(MessageChannel c, IObservable<MyMessage> obs) { 
            _c = c; _obs = obs;
        }
        public void Dispose() {
            _c.Unsubscribe(_obs);
        }
    }

    public IDisposable Subscribe(IObserver<MyMessage> observer)
    {
        observers.Add(observer);
        return new Subscription(this, observer);
    }

    void Unsubscribe(IObservable<MyMessage> obs) {
        observers.Remove(obs);
    }
}
Frank Krueger
Many thanks Frank, that hit it on the head. Just needed to change the Subcribe class to use IObserver rather than IObservable (I'm not complaining!). I used the MSDN sample provided on IObserverable (http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx)
Jason Hyland