views:

329

answers:

3

All I really want to do is complete two async operations, one right after the other. E.g.

Download website X. Once that's completed, download website Y.

+2  A: 

Do a SelectMany from both observables (use ToAsync to get the operations as IObservables), just like a LINQ select many (or using syntax sugar):

var result = 
from x in X
from y in Y
select new { x, y }  

There are other options but it depends on your specific scenario.

Richard Hein
Just a word of caution. If 'X' or 'Y' return more than once, we will start to see cross-join behavior. It works out perfectly for an async operation that will return only once. But to be safe, using the Prune extention method will only allow one notification to come through, and is therefore a good idea for something like a webservice call.
McAravey
+1  A: 

I don't like suggestions like this one myself, but...

If you need to perform two operations sequentially, do them sequentially (you can still do them on a thread different from main).

If you still want to separate the tasks in code, then the new construct from System.Parallel would be appropriate:

var task1 = Task.Factory.StartNew (() => FirstTask());
var task2 = task1.ContinueWith (frst => SecondTask ());

And if this is a way to grok system.reactive, try Observable.GenerateInSequence - but it surely will be overkill. Remember, observable is a counterpart to enumerable, and it is better used in a similar way (iterating over data). It is not a tool to run async operations.

Edit: I want to admit that I was wrong and Richard was right - at the time of my reply I wasn't fully comfortable with RX. Now I think that RX is the most natural way to start asynchronous operations. Yet, Richard's answer is a bit skimpy, should be:

var result =  
from x in XDownload.ToAsync()() 
from y in YDownload.ToAsync()() 
select y   
Sergey Aldoukhov
"It is not a tool to run async operations." I'd totally disagree with that statement, the continuation monad (Rx) is exactly what we need for composition of async operations, because we pass any state explicitly and all side-effects are managed. One of the primary reasons for creating Rx in the first place is async and parallel programming.
Richard Hein
Observing async operations - yes. Creating async operations from the observable pipeline - yes. Using Observable to launch async tasks (especially one or two) without observing the result - this would be overcomplicating.
Sergey Aldoukhov
Yes, the whole point of Rx is the async operation composability. I just finish writing an application that relies heavily on Rx, and I use code that looks exactly like Richards example. In one place I needed to make up to 18 web service requests to load data, and I would have needed 10 times as much code without Rx.
McAravey
+1  A: 

Maybe something along the following lines:

    static IObservable<DownloadProgressChangedEventArgs> CreateDownloadFileObservable(string url, string fileName)
    {
       IObservable<DownloadProgressChangedEventArgs> observable =
           Observable.CreateWithDisposable<DownloadProgressChangedEventArgs>(o =>
        {
            var cancellationTokenSource = new CancellationTokenSource();
            Scheduler.TaskPool.Schedule
            (
                () =>
                {

                    Thread.Sleep(3000);
                    if (!cancellationTokenSource.Token.IsCancellationRequested)
                    {
                        WebClient client = new WebClient();
                        client.DownloadFileAsync(new Uri(url), fileName,fileName);


                        DownloadProgressChangedEventHandler prgChangedHandler = null;
                        prgChangedHandler = (s,e) =>
                            {                                                   
                                o.OnNext(e);
                            };


                        AsyncCompletedEventHandler handler = null;
                        handler = (s, e) =>
                        {
                            prgChangedHandler -= prgChangedHandler;                            
                            if (e.Error != null)
                            {
                                o.OnError(e.Error);
                            }
                            client.DownloadFileCompleted -= handler;
                            o.OnCompleted();
                        };
                        client.DownloadFileCompleted += handler;
                        client.DownloadProgressChanged += prgChangedHandler;
                    }
                    else
                    {
                        Console.WriteLine("Cancelling download of {0}",fileName);
                    }
                }
            );
            return cancellationTokenSource;
        }
        );
       return observable;
    }

    static void Main(string[] args)
    {

        var obs1 = CreateDownloadFileObservable("http://www.cnn.com", "cnn.htm");
        var obs2 = CreateDownloadFileObservable("http://www.bing.com", "bing.htm");

        var result = obs1.Concat(obs2);
        var subscription = result.Subscribe(a => Console.WriteLine("{0} -- {1}% complete ",a.UserState,a.ProgressPercentage), e=> Console.WriteLine(e.Message),()=> Console.WriteLine("Completed"));
        Console.ReadKey();
        subscription.Dispose();
        Console.WriteLine("Press a key to exit");
        Console.ReadKey();
}

You could turn the CreateDownloadFileObservable into an extension method on WebClient, and remove the WebClient client = new WebClient(); in the method definiton, the client woudl then look like so:

WebClient c = new WebClient();
c.CreateDownloadFileObservable ("www.bing.com","bing.htm");
Abhijeet Patel