



Hi all,

copy paste the following code in new C# console app.

class Program
    static void Main(string[] args)
        var enumerator = new QueuedEnumerator<long>();
        var listenerWaitHandle = Listener(enumerator);


    private static AutoResetEvent Listener(IEnumerator<long> items)
        var @event = new AutoResetEvent(false);
        ThreadPool.QueueUserWorkItem((o) =>
            while (items.MoveNext())
                Console.WriteLine("Received : " + items.Current);
                Thread.Sleep(2 * 1000);
            (o as AutoResetEvent).Set();
        }, @event);
        return @event;

    private static void Publisher(QueuedEnumerator<long> enumerator)
        for (int i = 0; i < 10; i++)
            Console.WriteLine("Sended : " + i);
            Thread.Sleep(1 * 1000);

    class QueuedEnumerator<T> : IEnumerator<T>
        private Queue _internal = Queue.Synchronized(new Queue());
        private T _current;
        private bool _finished;
        private AutoResetEvent _setted = new AutoResetEvent(false);

        public void Finish()
            _finished = true;

        public void Set(T item)
            if (_internal.Count > 3)
                Console.WriteLine("I'm full, give the listener some slack !");
                Thread.Sleep(3 * 1000);

        public T Current
            get { return _current; }

        public void Dispose()

        object System.Collections.IEnumerator.Current
            get { return _current; }

        public bool MoveNext()
            if (_finished && _internal.Count == 0)
                return false;
            else if (_internal.Count > 0)
                _current = (T)_internal.Dequeue();
                return true;
                return MoveNext();

        public void Reset()

2 threads (A,B)

A thread can provide one instance at a time and calls the Set method B thread wants to receive a sequence of instances (provided by thread A)

so literally transforming an Add(item), Add(item), .. to a IEnumerable between different threads

Other solutions also welcome ofcourse!!

greetings, Tim

+1  A: 

Sure - this code might not be the best way to do it, but here was my initial stab at it:

Subject<Item> toAddObservable;
ListObservable<Item> buffer;

void Init()
    // Subjects are an IObservable we can trigger by-hand, they're the 
    // mutable variables of Rx
    toAddObservable = new Subject(Scheduler.TaskPool);

    // ListObservable will hold all our items until someone asks for them
    // It will yield exactly *one* item, but only when toAddObservable
    // is completed.
    buffer = new ListObservable<Item>(toAddObservable);

void Add(Item to_add)
    lock (this) {
        // Subjects themselves are thread-safe, but we still need the lock
        // to protect against the reset in FetchResults

IEnumerable<Item> FetchResults()
    IEnumerable<Item> ret = null;
    buffer.Subscribe(x => ret = x);

    lock (this) {
        Init();     // Recreate everything

    return ret;
Paul Betts
thx for the answer, +1 for the effort :) however I still find my current solution nicer, perhaps I was trying to solve something with Reactive while it's not suited for the problem :s
Tim Mahy
I think so, in the sense that Rx doesn't like to do the "Freeze at arbitrary point and return as Enumerable" - you can use something like Buffer, but really the idea is that you process items as they come in; the impedance mismatch comes from where we are switching from Push model (IObservable) to Pull model (IEnumerable) back and forth here
Paul Betts