views:

48

answers:

2

Hi, I have a pipeline setup all using reactive extensions, starting with a stream of values (could arrive any time they want) and then, subscribing to it there are many different "modules" outputting a stream of calculated values, and that stream is also subscribed to by another level of modules who also compute other values.

Now at the end of all this there is one class that listens to all those modules values and will output some other kind of value itself. Now in normal life the initial stream of values arrive whenever, one by one. But I also want to be able to "replay" those values and get all the outputs, but this time "as fast as possible". But I still need those values to make sense, ie, that the final output could be "retraced" to the original value that generated it. Not sure if I make sense here.

There are a few things I am considering, but none of which seem really clean.

One is to have "accelerated time". Ie have some kind of time warp and replay all, say, at 100x speed or whatever. This does have some significant drawbacks though, as in 100x speed then the time to process a value becomes much more significant.

The other one I was wondering (and leading to my question) about would be to be able to "step" all of this, ie to have a way to know when an OnNext has been fully handled by all its subscribers, so that I could just send one value in, wait for everybody to have handled it, get the corresponding output value, and then send the next one. The would have the advantage that it could basically be replayed "as fast as my computer allows".

I was wondering if there was something I was missing, somewhere, or a way to do things that I had overlooked that could easily do what I want. Failing that the only idea I could come up with was to have proxies/facades for all my classes so as to intercept every incoming and outgoing values and notify that so that my "replayer" would be able to trace it all.

I have smoked too much pot or should I switch dealers ? ... ;-)

A: 

If I understand your requirements here you want to be able to replay your observable stream of values without the original delay between each pushed value. If that's right then isn't this then a case when the Replay extension method can be used?

IConnectableObservable<TSource>
    Replay<TSource>(this IObservable<TSource> source);

Here's the test code I put together to check the behaviour:

var sw = new System.Diagnostics.Stopwatch();
sw.Start();

Action<string, int> writeLine = (t, n) =>
    Console.WriteLine("{0}: {1} @ {2} seconds",
        t, n,
        sw.Elapsed.TotalSeconds.ToString("0.000"));

var source = Observable
    .GenerateWithTime<int, int>
    (0, n => n < 5, n => n + 1, n => n,
    n => TimeSpan.FromSeconds(1.0));

var replay = source.Replay();

using (var replayConnect = replay.Connect())
{
    Action sourceFinally = () =>
    {
        replay.Run(n => writeLine("Replay", n));
    };
    source.Finally(sourceFinally).Run(n => writeLine("Source", n));
}

The output from running this code is:

Source: 0 @ 1.094 seconds
Source: 1 @ 2.115 seconds
Source: 2 @ 3.129 seconds
Source: 3 @ 4.143 seconds
Source: 4 @ 5.158 seconds
Replay: 0 @ 5.199 seconds
Replay: 1 @ 5.201 seconds
Replay: 2 @ 5.201 seconds
Replay: 3 @ 5.201 seconds
Replay: 4 @ 5.201 seconds

If you need to know when the original values were pushed you can use the Timestamp extension method to change the observable from IObservable<T> to IObservable<Timestamped<T>>.

IObservable<Timestamped<TSource>>
    Timestamp<TSource>(this IObservable<TSource> source);

I can't tell if you've smoked too much pot, but does that help you to "detox"? ;-)

Enigmativity
+1  A: 

I have probably formulated my question in a bad way. Let me rephrase. My problem is not so much to "Replay" the values, I can do that just fine, even if just loading them from disk, and putting them into an IEnumerable, then they will be fed as fast as possible. My problem is that, since all the pipeline is multi threaded in nature, if I send them just like that, as fast as possible, I will not have the ability to trace which input value generated which output value (unlike in your example, they would probably be intertwinned with each other). I thought of the TimeStamped but it is not useful to me for several reasons : 1) The resolution of the TimeStamped is pretty much that of DateTime.Now, ie 10 ms which is not precise enough and 2) again, in a time based approach, if my pipeline takes,say, 5 msec to process, then all of a sudden this 5 msec takes too much meaning (like, in 5 msec probably a lot of input values would have already come in, making it impossible to know which one generated which output value). I know this all sounds very "vague" and it definitely lacks a small sample, but I can't put my whole code in there as it is too long and complex, I will try at some point to get some small example going. All of this is for a "simulation" environment, where I can replay yesterday's values and retrace what happened and when, but I could replay them faster.

Thanks a lot for your response though.

Big