I am chaining together 15 async operations through ports and receivers. This has left me very concerned with the interthread messaging time, specifically the time it takes between a task posting data to a port, and a new task begins processing that same data on a different thread. Assuming best case situation where each thread is idle at start, I have generated a test which uses the stop watch class to measure the time from two different dispatchers each operating at highest priority with a single thread.
What I found surprised me, my development rig is a Q6600 Quad Core 2.4 Ghz computer running Windows 7 x64, and the average context switch time from my test was 5.66 microseconds with a standard deviation of 5.738 microseconds, and a maximum of nearly 1.58 milliseconds (a factor of 282!). The Stopwatch Frequency is 427.7 nano seconds, so I am still well out of sensor noise.
What I would like to do is reduce the interthread messaging time as much as possible, and equally important, reduce the standard deviation of the context switch. I realize Windows is not a Real Time OS, and there are not guarantees, but the windows scheduler is a fair round robin priority based schedule, and the two threads in this test are both at the highest priority (the only threads that should be that high), so there should not be any context switches on the threads (evident by the 1.58 ms largest time... I believe windows quanta is 15.65 ms?) The only thing I can think of is variation in the timing of the OS calls to the locking mechanisms used by the CCR to pass messages between threads.
Please let me know if anyone else out there has measured interthread messaging time, and has any suggestions on how to improve it.
Here is the source code from my tests:
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Microsoft.Ccr.Core;
using System.Diagnostics;
namespace Test.CCR.TestConsole
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Starting Timer");
var sw = new Stopwatch();
sw.Start();
var dispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "My Thread Pool");
var dispQueue = new DispatcherQueue("Disp Queue", dispatcher);
var sDispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "Second Dispatcher");
var sDispQueue = new DispatcherQueue("Second Queue", sDispatcher);
var legAPort = new Port<EmptyValue>();
var legBPort = new Port<TimeSpan>();
var distances = new List<double>();
long totalTicks = 0;
while (sw.Elapsed.TotalMilliseconds < 5000) ;
int runCnt = 100000;
int offset = 1000;
Arbiter.Activate(dispQueue, Arbiter.Receive(true, legAPort, i =>
{
TimeSpan sTime = sw.Elapsed;
legBPort.Post(sTime);
}));
Arbiter.Activate(sDispQueue, Arbiter.Receive(true, legBPort, i =>
{
TimeSpan eTime = sw.Elapsed;
TimeSpan dt = eTime.Subtract(i);
//if (distances.Count == 0 || Math.Abs(distances[distances.Count - 1] - dt.TotalMilliseconds) / distances[distances.Count - 1] > 0.1)
distances.Add(dt.TotalMilliseconds);
if(distances.Count > offset)
Interlocked.Add(ref totalTicks,
dt.Ticks);
if(distances.Count < runCnt)
legAPort.Post(EmptyValue.SharedInstance);
}));
//Thread.Sleep(100);
legAPort.Post(EmptyValue.SharedInstance);
Thread.Sleep(500);
while (distances.Count < runCnt)
Thread.Sleep(25);
TimeSpan exTime = TimeSpan.FromTicks(totalTicks);
double exMS = exTime.TotalMilliseconds / (runCnt - offset);
Console.WriteLine("Exchange Time: {0} Stopwatch Resolution: {1}", exMS, Stopwatch.Frequency);
using(var stw = new StreamWriter("test.csv"))
{
for(int ix=0; ix < distances.Count; ix++)
{
stw.WriteLine("{0},{1}", ix, distances[ix]);
}
stw.Flush();
}
Console.ReadKey();
}
}
}