views:

135

answers:

2

So I'm just playing around with RX and learning it. I started playing with Events, and wanted to know how to subscribe to events, and process the results in batches asynchronously. Allow me to explain with code:

Simple class that raises events:

public class EventRaisingClass
{
   public event EventHandler<SomeEventArgs> EventOccured;

   //some other code that raises event...
}

public class SomeEventArgs : EventArgs
{
    public SomeEventArgs(int data)
    {
        this.SomeArg = data;
    }

    public int SomeArg { get; private set; }
}

Then my Main:

public static void Main(string[] args)
{
    var eventRaiser = new EventRaisingClass();
    IObservable<IEvent<SomeEventArgs>> observable = 
        Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e);

    IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100);

    //how can I subscribte to bufferedEvents so that the subscription code gets called Async?
    bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously...

}

As you can see in my comments, when you just call subscribe like that, all the subscription code happens synchronously. Is there a way out of the box using RX to have the Subscribe be called on different threads whenever there's a new batch of events to work on?

+2  A: 

I believe you're looking for SubscribeOn or ObserveOn, passing an IScheduler. There are several schedulers built-in under System.Concurrency; some of them use whatever thread is current, and others use specific threads.

This video has more info on the scheduler concept.

The Rx team also recently released a hands-on labs document which is the closest thing to a tutorial right now.

Stephen Cleary
+2  A: 
bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(...

SubscribeOn is to specify the schedule on which so-called "subscription side effects" are happening. For example, your observable can open a file each time somebody subscribes.

ObserveOn is to specify the schedule on which the call to the observer will happen every time when there is a new value. In practice, it is used more often than SubscribeOn.

Sergey Aldoukhov