Currently, I'm using the RX Framework to implement a workflow-like message handling pipeline. Essentially I have a message producer (deserializes network messages and calls OnNext() on a Subject) and I have several consumers.
NOTE: If and transform are extension methods I have coded that simply return an IObservable.
A consumer does something like the following:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
is then consumed by another similar pipeline and this continues up until the top where it ends with someone calling Subscribe()
on the final pipeline. The issue I'm having is that the messages from the base don't propagate up unless subscribe is called on messages directly somewhere.
How can I push the messages up to the top of the stack? I know this is an unorthodox approach but I feel it makes the code very simple to understand what is occurring to a message. Can anyone suggest another way of doing the same if you feel this is a totally terrible idea?