views:

150

answers:

1

I wondered if it's possible to use a queue (specifically as ConcurrentQueue) as the source of an IObservable? Something like;

  Queue = new ConcurrentQueue<IMessage>();
  var xs = Queue.AsEnumerable().ToObservable();

  xs.Subscribe((IMessage msg) =>
     {
        Console.WriteLine("Msg :" + msg.subject);
     });

I guess it doesn't really make sense because nothing is being dequeued. I'm trying to implement a non-blocking process which can subscribe to "messages" being pushed to observers, hence the use of a queue. I'm sure I should be able to do this with RX, but can't seem to get my head around it!

I'd be interested in any suggestions on how this could be implemented. Thanks!

+1  A: 

You're right, converting a Queue (concurrent or simple, doesn't matter) would only enumerate it, but not de-queue. "Real" implementation is possible, but more complex - see the link to a similar question I asked on the RX forum (which is still a better source of information on RX comparing to StackOverflow):

How to implement a single worker consumer producer queue using RX?

Sergey Aldoukhov
Thanks Sergey, confirms it's not quite as turn-key as I'd hoped :) I did find your original question on the RX forums and the OneByOneTask<T> does demo the 'Observable.GenerateInSequence' syntax. That looks favourite!
Jason Hyland