views:

47

answers:

1

When implementing IObserver yourself, you know how well you would cope with a situation where OnNext is invoked from different threads, concurrently or sequentially, but what are the expectations of the built in Reactive Extension primitives when it comes to this? Will BufferWithTime, for example, cope with OnNext being invoked from multiple threads? Invoked concurrently from multiple threads?

+1  A: 

As long as you follow the two parts of the contract that Rx uses, you should be fine:

  • follow the following grammar: OnNext* (OnError | OnCompleted)?
  • make sure your messages are synchronized (no two messages in flight at the same time)

You can use the Observable.Synchronize operator to fix an implementation of IObservable that doesn't follow these two rules.

Jeffrey

Jeffrey van Gogh