views:

63

answers:

1

Hi all,

copy paste the following code in new C# console app.

class Program
{
    static void Main(string[] args)
    {
        var enumerator = new QueuedEnumerator<long>();
        var listenerWaitHandle = Listener(enumerator);

        Publisher(enumerator);
        listenerWaitHandle.WaitOne();
    }

    private static AutoResetEvent Listener(IEnumerator<long> items)
    {
        var @event = new AutoResetEvent(false);
        ThreadPool.QueueUserWorkItem((o) =>
        {
            while (items.MoveNext())
            {
                Console.WriteLine("Received : " + items.Current);
                Thread.Sleep(2 * 1000);
            }
            (o as AutoResetEvent).Set();
        }, @event);
        return @event;
    }

    private static void Publisher(QueuedEnumerator<long> enumerator)
    {
        for (int i = 0; i < 10; i++)
        {
            enumerator.Set(i);
            Console.WriteLine("Sended : " + i);
            Thread.Sleep(1 * 1000);
        }
        enumerator.Finish();
    }

    class QueuedEnumerator<T> : IEnumerator<T>
    {
        private Queue _internal = Queue.Synchronized(new Queue());
        private T _current;
        private bool _finished;
        private AutoResetEvent _setted = new AutoResetEvent(false);

        public void Finish()
        {
            _finished = true;
            _setted.Set();
        }

        public void Set(T item)
        {
            if (_internal.Count > 3)
            {
                Console.WriteLine("I'm full, give the listener some slack !");
                Thread.Sleep(3 * 1000);
                Set(item);
            }
            else
            {
                _internal.Enqueue(item);
                _setted.Set();
            }
        }

        public T Current
        {
            get { return _current; }
        }

        public void Dispose()
        {
        }


        object System.Collections.IEnumerator.Current
        {
            get { return _current; }
        }

        public bool MoveNext()
        {
            if (_finished && _internal.Count == 0)
                return false;
            else if (_internal.Count > 0)
            {
                _current = (T)_internal.Dequeue();
                return true;
            }
            else
            {
                _setted.WaitOne();
                return MoveNext();
            }
        }

        public void Reset()
        {
        }
    }
}

2 threads (A,B)

A thread can provide one instance at a time and calls the Set method B thread wants to receive a sequence of instances (provided by thread A)

so literally transforming an Add(item), Add(item), .. to a IEnumerable between different threads

Other solutions also welcome ofcourse!!

greetings, Tim

+1  A: 

Sure - this code might not be the best way to do it, but here was my initial stab at it:

Subject<Item> toAddObservable;
ListObservable<Item> buffer;

void Init()
{
    // Subjects are an IObservable we can trigger by-hand, they're the 
    // mutable variables of Rx
    toAddObservable = new Subject(Scheduler.TaskPool);

    // ListObservable will hold all our items until someone asks for them
    // It will yield exactly *one* item, but only when toAddObservable
    // is completed.
    buffer = new ListObservable<Item>(toAddObservable);
}

void Add(Item to_add)
{
    lock (this) {
        // Subjects themselves are thread-safe, but we still need the lock
        // to protect against the reset in FetchResults
        ToAddOnAnotherThread.OnNext(to_add);
    }
}

IEnumerable<Item> FetchResults()
{
    IEnumerable<Item> ret = null;
    buffer.Subscribe(x => ret = x);

    lock (this) {
        toAddObservable.OnCompleted();
        Init();     // Recreate everything
    }

    return ret;
}
Paul Betts
thx for the answer, +1 for the effort :) however I still find my current solution nicer, perhaps I was trying to solve something with Reactive while it's not suited for the problem :s
Tim Mahy
I think so, in the sense that Rx doesn't like to do the "Freeze at arbitrary point and return as Enumerable" - you can use something like Buffer, but really the idea is that you process items as they come in; the impedance mismatch comes from where we are switching from Push model (IObservable) to Pull model (IEnumerable) back and forth here
Paul Betts