views:

118

answers:

3

I'm using the TPL but I'm finding it tricky unit testing code that uses it.

I'm trying to not introduce a wrapper over it as I feel it could introduce issues.

I understand that you can set processor affinity in the TPL, but what'd be really nice is to set a thread maximum (probably per app-domain). Therefore, when setting the thread maximum to 1, the TPL would be forced to use whatever thread it was used on.

What do you think? Is this possible (I'm pretty sure it's not), and should it be possible?

Edit: here's an example

public class Foo
{
    public Foo( )
    {
        Task.Factory.StartNew( () => somethingLong( ) )
            .ContinueWith( a => Bar = 1 ) ;
    }
}

[Test] public void Foo_should_set_Bar_to_1( )
{
    Assert.Equal(1, new Foo( ).Bar ) ;
}

The test probably won't pass unless I introduce a delay. I'd like to have something like Task.MaximumThreads=1 so that the TPL will run serially.

+2  A: 

You could create your own TaskScheduler class deriving from TaskScheduler, pass that into the TaskFactory. Now you can have any Task objects that you create run against that scheduler.

No need to set it to use one thread.

Then, right before your asserts, just call Dispose() on it. Internally it will do something like this if you follow the samples out there for writing a TaskScheduler:-

public void Dispose()
{
    if (tasks != null)
    {
        tasks.CompleteAdding();

        foreach (var thread in threads) thread.Join();

        tasks.Dispose();
        tasks = null;
    }
}

That will guarantee that all the Tasks have been run. Now you can move ahead with your Asserts.

You could also use ContinueWith(...) to add assertions after a Task has run if you wanted to check progress as things were happening.

Hightechrider
Thanks, this is a very interesting idea. I'll take a look.
Steve Dunn
+1  A: 

Really this is more of an issue with the testability of lambda heavy code than it is about the TPL. Hightechrider's suggestion is a good one but essentially your tests are still testing the TPL as much as they are your code. You don't really need to test that when the first task ends and the ContinueWith starts the next task.

If the code inside your lambdas is significantly large then pulling it out into a more testable method with clearly defined parameters may result in easier to read and more testable code. You can write unit tests around that. Where possible I try and limit or remove parallelism from my unit tests.

Having said that I wanted to see if the scheduler approach would work. Here's an implementation using a modified StaTaskScheduler from http://code.msdn.microsoft.com/ParExtSamples

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using Xunit;

    namespace Example
    {
      public class Foo
      {
        private TaskScheduler _scheduler;

    public int Bar { get; set; }

    private void SomethingLong()
    {
      Thread.SpinWait(10000);
    }

    public Foo()
      : this(TaskScheduler.Default)
    {
    }

    public Foo(TaskScheduler scheduler)
    {
      _scheduler = scheduler;
    }

    public void DoWork()
    {
      var factory = new TaskFactory(_scheduler);

      factory.StartNew(() => SomethingLong())
      .ContinueWith(a => Bar = 1, _scheduler);
    }
  }

  public class FooTests
  {
    [Fact]
    public void Foo_should_set_Bar_to_1()
    {
      var sch = new StaTaskScheduler(3);
      var target = new Foo(sch);
      target.DoWork();

      sch.Dispose();
      Assert.Equal(1, target.Bar);
    }
  }

  public sealed class StaTaskScheduler : TaskScheduler, IDisposable
  {
    /// <summary>Stores the queued tasks to be executed by our pool of STA threads.</summary>
    private BlockingCollection<Task> _tasks;
    /// <summary>The STA threads used by the scheduler.</summary>
    private readonly List<Thread> _threads;

    /// <summary>Initializes a new instance of the StaTaskScheduler class with the specified concurrency level.</summary>
    /// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
    public StaTaskScheduler(int numberOfThreads)
    {
      // Validate arguments
      if (numberOfThreads < 1) throw new ArgumentOutOfRangeException("concurrencyLevel");

      // Initialize the tasks collection
      _tasks = new BlockingCollection<Task>();

      // Create the threads to be used by this scheduler
      _threads = Enumerable.Range(0, numberOfThreads).Select(i =>
      {
        var thread = new Thread(() =>
        {
          // Continually get the next task and try to execute it.
          // This will continue until the scheduler is disposed and no more tasks remain.
          foreach (var t in _tasks.GetConsumingEnumerable())
          {
            TryExecuteTask(t);
          }
        });
        thread.IsBackground = true;
        // NO STA REQUIREMENT!
        // thread.SetApartmentState(ApartmentState.STA);
        return thread;
      }).ToList();

      // Start all of the threads
      _threads.ForEach(t => t.Start());
    }

    /// <summary>Queues a Task to be executed by this scheduler.</summary>
    /// <param name="task">The task to be executed.</param>
    protected override void QueueTask(Task task)
    {
      // Push it into the blocking collection of tasks
      _tasks.Add(task);
    }

    /// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
    /// <returns>An enumerable of all tasks currently scheduled.</returns>
    protected override IEnumerable<Task> GetScheduledTasks()
    {
      // Serialize the contents of the blocking collection of tasks for the debugger
      return _tasks.ToArray();
    }

    /// <summary>Determines whether a Task may be inlined.</summary>
    /// <param name="task">The task to be executed.</param>
    /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
    /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
      // Try to inline if the current thread is STA
      return
      Thread.CurrentThread.GetApartmentState() == ApartmentState.STA &&
      TryExecuteTask(task);
    }

    /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
    public override int MaximumConcurrencyLevel
    {
      get { return _threads.Count; }
    }

    /// <summary>
    /// Cleans up the scheduler by indicating that no more tasks will be queued.
    /// This method blocks until all threads successfully shutdown.
    /// </summary>
    public void Dispose()
    {
      if (_tasks != null)
      {
        // Indicate that no new tasks will be coming in
        _tasks.CompleteAdding();

        // Wait for all threads to finish processing tasks
        foreach (var thread in _threads) thread.Join();

        // Cleanup
        _tasks.Dispose();
        _tasks = null;
      }
    }
  }
}
Ade Miller
Thanks for that Ade. I too have found that separating out the code inside the lamda into testable methods is the best way to go. In some places, I have a constructor overload that takes a bool to indicate synchronous or asynchronous operation. This kind of defeats the goal of hiding the implementation from users of my type, but seems to be a good compromise at the moment.
Steve Dunn
A: 

If you are wanting to get rid of the need to overload the constructor you can wrap the unit test code in a Task.Factory.ContinueWhenAll(...).

public class Foo
{
    public Foo( )
    {
        Task.Factory.StartNew( () => somethingLong( ) )
            .ContinueWith( a => Bar = 1 ) ;
    }
}

[Test] public void Foo_should_set_Bar_to_1( )
{
    Foo foo;
    Task.Factory.ContinueWhenAll(
        new [] {
            new Task(() => {
                foo = new Foo();
            })
        },
        asserts => { 
            Assert.Equal(1, foo.Bar ) ;
        }
    ).Wait;
}

Would be keen to hear some feed back on this approach.

Jason