views:

45

answers:

1

I have a program that has a ton of sensors producing data at a fairly high rate, and consumers that need to consume it. The consumers consume at very different rates.

Since I am using IObserver/IObservable, the trivial solution was to simply create a Task for each event and wrap the OnNext() call and the data in a lamda. This worked very well, I was surprised how little overhead there was over raw calls.

The problem is that some of these consumers need the order of events strictly enforced, and cannot miss any events. "PerferFairness" isn't good enough.

The best solution I have come up with is instead of wrapping the event/OnNext() pair, to wrap an Insert into a ParallelQueue, one queue per consumer, and have a thread on the other end of the Queue to make the OnNext() calls.

Three immediate problems with this approach. It's much, much slower than the Task/OnNext() wrapping solution. There is no Blocking Dequeue (or is there?) for ParallelQueue so the implementation is a bit tricky. The third is that this seems such a common problem that I can't imagine there isn't some way to enforce order that I missed, maybe something like multiple task factories sharing an underlying pool, each Factory with some setting that makes them strictly enforce order.

Anyone know the proper way to achieve what I'm trying to do?

EDIT : Any solution that involves a thread per consumer or producer doesn't work. Producers/Consumers form long chains, there are hundreds of each.

+1  A: 

No comment on the best abstraction for enforcing partial ordering, but if you use a BlockingCollection<T> wrapper around a ConcurrentQueue<T>, that will give you a blocking Take operation to dequeue elements. e.g.:

// the default is ConcurrentQueue, so you don't have to specify, but if you
// wanted different behavior you could use e.g. ConcurrentStack

var coll = new BlockingCollection<int>(new ConcurrentQueue<int>());

coll.Add(5); // blocks if the collection is at max capacity

int five = coll.Take(); // blocks if the collection is empty
mquander