After experimenting with IObservables, I've decided to test them for the processing of incoming messages over a message bus. Essentially I get an IObservable<Request>
and the Request
contains the necessary functions to reply to the response.
At a point during processing I have to deserialize the data and convert it from a Request to a Command object that contains what it actually needs to do. Command is not related to Request.
After deserializing it I transform it into the proper response, however in order to send the response I need the original Request object. I want to try and achieve this while maintaining high code readability. So far I've used extension methods and lambda expressions to get the following (where requests
is the IObservable<Request>
):
requestProcessor = requests
.Deserialize<IdentityRequest>()
.Where(idRequest => idRequest.Address != null)
.Select(idRequest => new IdentityResponse() {Identity = identityTable[idRequest.Address.Address]})
.Serialize()
.Zip(requests, (response, request) => new {request = request, response = response})
.Subscribe(data => data.request.Respond(data.response, ObjectTypes.IdentityResponse));
My question is, since all the commands before the Zip
function take time to process, will the Zip
operate on the same input object (ie. the original input, and also on the seperate processed input) if there are a constant stream of messages. How can I test this?
Is there a better way of doing this?