views:

113

answers:

3

I have a IObservable [named rows in the sample below] from Reactive extensions framework and I want to add index numbers to each object it observes.

I've tried to implement this using Zip function:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index })
    .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

.. but unfortunately this throws

ArgumentOutOfRangeException: Specified argument was out of the range of valid values. Parameter name: disposables

Am I understanding the Zip function wrong or is there a problem with my code?

The Range part of the code doesn't seem to be the problem and the IObservable isn't yet receiving any events.

A: 

Apparently, Zip extension methods converts the original custom IObservable to an anonymous observable and Subscribing to it creates an System.Collections.Generic.AnonymousObserver, which doesn't implement IDisposable. Thus, you cannot implement the Subscribe method the normal way (atleast the way I've seen it used), which is

public IDisposable Subscribe(IObserver<T> observer) {
  // ..add to observer list..
  return observer as IDisposable
}

More likely the correct answer would be:

return Disposable.Create(() => Observers.Remove(observer));

You should though note that the collction will probably be modified durin Completed-method, so create a copy of the list before processing them:

public void Completed()
{
    foreach (var observer in Observers.ToList())
    {
        observer.OnCompleted();
    }
 }
Toni Kielo
A: 

I am not sure what your problem is, does this work for you (and what's missing here that you are doing?):

    static void Main(string[] args)
    {
        var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
        rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
            new { Row = row, Index = index })
            .Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());

        Console.ReadLine();
    }
    static void ProcessRow(int row, int index) {
        Console.WriteLine("Row {0}, Index {1}", row, index);
    }
    static void Completed() {
    }
Richard Hein
The problem was with my IDisposable unsubscriber creation. I stupidly copy/pasted a sample from somewhere, which was wrong for my case. As mentioned above, I should have used Disposable.Create function, but the way I used it returned a null IDisposable and that's why it threw a weird exception.
Toni Kielo
Ok thanks for the update.
Richard Hein
+1  A: 

.Select has an overload to include the index:

rows.Select((row, index) => new { row, index });
Sergey Aldoukhov