views:

92

answers:

1

Hello everybody,

I have downloaded the last samples of the Parallel Programming team, and I don't succeed in adding correctly the possibility to cancel the download of a file.

Here is the code I ended to have:

var wreq = (HttpWebRequest)WebRequest.Create(uri);

// Fire start event
DownloadStarted(this, new DownloadStartedEventArgs(remoteFilePath));

long totalBytes = 0;

wreq.DownloadDataInFileAsync(tmpLocalFile,
                             cancellationTokenSource.Token,
                             allowResume,
                             totalBytesAction =>
                             {
                                 totalBytes = totalBytesAction;
                             },
                             readBytes =>
                             {
                                 Log.Debug("Progression : {0} / {1} => {2}%", readBytes, totalBytes, 100 * (double)readBytes / totalBytes);
                                 DownloadProgress(this, new DownloadProgressEventArgs(remoteFilePath, readBytes, totalBytes, (int)(100 * readBytes / totalBytes)));
                             })
    .ContinueWith( (antecedent ) =>
                      {
                          if (antecedent.IsFaulted)
                              Log.Debug(antecedent.Exception.Message);

                          //Fire end event
                          SetEndDownload(antecedent.IsCanceled, antecedent.Exception, tmpLocalFile, 0);
                      }, cancellationTokenSource.Token);

I want to fire an end event after the download is finished, hence the ContinueWith.

I slightly changed the code of the samples to add the CancellationToken and the 2 delegates to get the size of the file to download, and the progression of the download:

return webRequest.GetResponseAsync()
    .ContinueWith(response =>
                      {
                          if (totalBytesAction != null)
                              totalBytesAction(response.Result.ContentLength);

                          response.Result.GetResponseStream().WriteAllBytesAsync(filePath, ct, resumeDownload, progressAction).Wait(ct);
                      }, ct);

I had to add the call to the Wait function, because if I don't, the method exits and the end event is fired too early.

Here are the modified method extensions (lot of code, apologies :p)

public static Task WriteAllBytesAsync(this Stream stream, string filePath, CancellationToken ct, bool resumeDownload = false, Action<long> progressAction = null)
{
    if (stream == null) throw new ArgumentNullException("stream");

    // Copy from the source stream to the memory stream and return the copied data
    return stream.CopyStreamToFileAsync(filePath, ct, resumeDownload, progressAction);
}

public static Task CopyStreamToFileAsync(this Stream source, string destinationPath, CancellationToken ct, bool resumeDownload = false, Action<long> progressAction = null)
{
    if (source == null) throw new ArgumentNullException("source");
    if (destinationPath == null) throw new ArgumentNullException("destinationPath");

    // Open the output file for writing
    var destinationStream = FileAsync.OpenWrite(destinationPath);

    // Copy the source to the destination stream, then close the output file.
    return CopyStreamToStreamAsync(source, destinationStream, ct, progressAction).ContinueWith(t =>
    {
        var e = t.Exception;
        destinationStream.Close();

        if (e != null)
            throw e;
    }, ct, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
}

public static Task CopyStreamToStreamAsync(this Stream source, Stream destination, CancellationToken ct, Action<long> progressAction = null)
{
    if (source == null) throw new ArgumentNullException("source");
    if (destination == null) throw new ArgumentNullException("destination");

    return Task.Factory.Iterate(CopyStreamIterator(source, destination, ct, progressAction));
}

private static IEnumerable<Task> CopyStreamIterator(Stream input, Stream output, CancellationToken ct, Action<long> progressAction = null)
{
    // Create two buffers.  One will be used for the current read operation and one for the current
    // write operation.  We'll continually swap back and forth between them.
    byte[][] buffers = new byte[2][] { new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
    int filledBufferNum = 0;
    Task writeTask = null;
    int readBytes = 0;

    // Until there's no more data to be read or cancellation
    while (true)
    {
        ct.ThrowIfCancellationRequested();

        // Read from the input asynchronously
        var readTask = input.ReadAsync(buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);

        // If we have no pending write operations, just yield until the read operation has
        // completed.  If we have both a pending read and a pending write, yield until both the read
        // and the write have completed.
        yield return writeTask == null
                         ? readTask
                         : Task.Factory.ContinueWhenAll(new[]
                                                            {
                                                                readTask,
                                                                writeTask
                                                            },
                                                        tasks => tasks.PropagateExceptions());

        // If no data was read, nothing more to do.
        if (readTask.Result <= 0)
            break;

        readBytes += readTask.Result;

        if (progressAction != null) 
            progressAction(readBytes);

        // Otherwise, write the written data out to the file
        writeTask = output.WriteAsync(buffers[filledBufferNum], 0, readTask.Result);

        // Swap buffers
        filledBufferNum ^= 1;
    }
}

So basically, at the end of the chain of called methods, I let the CancellationToken throw an OperationCanceledException if a Cancel has been requested.

What I hoped was to get IsFaulted == true in the appealing code and to fire the end event with the canceled flags and the correct exception.

But what I get is an unhandled exception on the line

response.Result.GetResponseStream().WriteAllBytesAsync(filePath, ct, resumeDownload, progressAction).Wait(ct);

telling me that I don't catch an AggregateException. I've tried various things, but I don't succeed to make the whole thing work properly.

Does anyone of you have played enough with that library and may help me?

Thanks in advance

Mike

A: 

This is pretty much the behavior I would expect. I've not been able to get my head entirely around your code but I suspect that if you look at the AggregateException.InnerExceptions property you'll find a TaskCancelled or OperationCancelledException in there. Cancelling the token causes an exception to be thrown by cancelled task which is typically caught by the calling code as it waits for the task to complete and this is wrapped in an aggregate.

The following links have more detail.

http://msdn.microsoft.com/en-us/library/dd997396.aspx

http://msdn.microsoft.com/en-us/library/dd997364.aspx

I'm not going to write more as I'm not 100% convinced I understand your code.

Ade Miller