tags:

views:

85

answers:

2

I'm just trying to do a simple event handler on a string variable, so that if the string changes, it will do a Console.WriteLine (using the new Reactive library from MS (Rx))

The issue that I have is that it will display the first bit when I instantiate the class ("RandomGuid : Mine?"), but after that, none of the stuff I change afterwards spits anything out to the console.

I went through the HOL from the MS website, but it goes straight from defining the Observable into reading values from a textbox, when all I want to do is watch whether a string was changed.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MynahBirds
{
    class Program
    {
        static void Main(string[] args)
        {
            List<Minah> minahs = new List<Minah>();

            for (int i = 0; i < 10; i++) {
                minahs.Add(new Minah());
            }

            foreach (var item in minahs) {
                item.peers = minahs;
            }

            minahs.ForEach(m => m.s = Observable.Return<string>("Mine"));
            minahs.ForEach(m => m.s = Observable.Return<string>("Whee"));
            minahs.ForEach(m => m.s = Observable.Return<string>("Argh"));

            Console.ReadLine();
        }
    }
    class Minah
    {
        Guid Id;
        public List<Minah> peers;
        IDisposable subscription;

        public IObservable<string> s = Observable.Return<string>("Mine?");

        public Minah()
        {
            try {
                this.Id = Guid.NewGuid();

                subscription = s.Subscribe((string a) => {
                    Console.WriteLine("{0} : {1}", this.Id, a);
                },
                (Exception ex) => {
                    Console.WriteLine("Error {0} hit", ex.ToString());
                },
                () => { });

            } catch (Exception ex) {
                Console.WriteLine(ex.ToString());
                Console.ReadLine();
                throw;
            }
        }
    }
}
+2  A: 

When you assign to m.s in the ForEach you are not updating the existing observable (which you have subscribed to) with a new value, instead you are creating new observables, which is what Observable.Return does. The code below does what I think you expect:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MynahBirds
{
    class Program
    {
        static void Main(string[] args)
        {
            List<Minah> minahs = new List<Minah>();

            for (int i = 0; i < 10; i++)
            {
                minahs.Add(new Minah());
            }

            foreach (var item in minahs)
            {
                item.peers = minahs;
            }

            minahs.ForEach(m => m.s.OnNext("Mine"));
            minahs.ForEach(m => m.s.OnNext("Whee"));
            minahs.ForEach(m => m.s.OnNext("Argh"));

            Console.ReadLine();
        }
    }
    class Minah
    {
        Guid Id;
        public List<Minah> peers;
        IDisposable subscription;

        public ISubject<string> s = new Subject<string>();

        public Minah()
        {
            try
            {
                this.Id = Guid.NewGuid();

                subscription = s.Subscribe((string a) =>
                {
                    Console.WriteLine("{0} : {1}", this.Id, a);
                },
                (Exception ex) =>
                {
                    Console.WriteLine("Error {0} hit", ex.ToString());
                },
                () => { });

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
                Console.ReadLine();
                throw;
            }
        }
    }
}

Instead of using Observable.Return<T>(), here I use a Subject, which is both an observer and an observable sequence. It updates all its subscriptions with each value it observes. So when OnNext is called on the subject, it is forwarded to all subscriptions.

If you need the initial value (Mine?) you can add s.OnNext("Mine?"); at the end of the Minah constructor.

Markus Johnsson
You are a rockstar, thank you for your time!
Manchuwook
A: 
using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Concurrent;

namespace MynahBirds
{
    class Program
    {
        static void Main(string[] args)
        {
            ThreadPool.SetMaxThreads(100, 100);

            ConcurrentBag<Minah> minahs = new ConcurrentBag<Minah>();
            Stopwatch ti = new Stopwatch();

            ti.Start();

            Task.Factory.StartNew(() => {
                for (int i = 1; i < 2501; i++) {
                    minahs.Add(new Minah(i));
                };
            });

            Task.Factory.StartNew(() => {
                for (int i = 1; i < 2501; i++) {
                    minahs.Add(new Minah(i));
                };
            });

            Task.Factory.StartNew(() => {
                for (int i = 1; i < 2501; i++) {
                    minahs.Add(new Minah(i));
                };
            });

            Task.Factory.StartNew(() => {
                for (int i = 1; i < 2501; i++) {
                    minahs.Add(new Minah(i));
                };
            });

            Task.WaitAll();

            string[] alpha = { "Alpha", "Bravo", "Charlie", "Delta", "Eagle", "Foxtrot", "Gulf", "Hotel" };

            foreach (string s in alpha) {
                Console.WriteLine(s);
                Task.Factory.StartNew(() => minahs.AsParallel().ForAll(m => m.RepeatWord = s)).Wait();
            }

            minahs.AsParallel().ForAll(m => m.s.OnCompleted());

            ti.Stop();

            Console.WriteLine("{1} birds : {0} seconds", ti.Elapsed.TotalSeconds, minahs.Count);
            Console.ReadLine();
        }
    }
    class Minah
    {
        Guid Id;
        IDisposable subscription;

        public ISubject<string> s = new Subject<string>();

        private string _RepeatWord;
        public string RepeatWord
        {
            get
            {
                return _RepeatWord;
            }
            set
            {
                this.s.OnNext(value);
                _RepeatWord = value;
            }
        }

        public Minah(int i)
        {
            try {
                this.Id = Guid.NewGuid();

                subscription = s.Subscribe((string a) => {
                    Console.WriteLine("{0} : {1}", i, a);
                },
                (Exception ex) => {
                    Console.WriteLine("Error {0} hit", ex.ToString());
                },
                () => { /* Console.WriteLine("{0} : Completed", this.Id); */ });

            } catch (Exception ex) {
                Console.WriteLine(ex.ToString());
                Console.ReadLine();
                throw;
            }
        }
    }
}

This is what I ended up doing with help from Markus. More playing around with parallelism. Interestingly, if I remove the .Wait() from the end of the .ForAll(... RepeatWord = s), it will only do the last word in the sequence. I'm guessing that is a closure thing, but I'm not overly concerned about it.

Manchuwook