views:

1016

answers:

4

I have an event which I would like to have processed in a parallel manner. My idea is to make each callback be added to the ThreadPool, effectivley having each method that registered the event handled by the ThreadPool.

My try-out code looks something like the following:

Delegate[] delegates = myEvent.GetInvocationList();
IAsyncResult[] results = new IAsyncResult[ delegates.Count<Delegate>() ];

for ( int i = 0; i < delegates.Count<Delegate>(); i++ )
{
    IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null );
    results[ i ] = result;
}

for ( int i = 0; i < delegates.Length; i++ )
{
    ( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] );
}

This is just for playing around since I am curious how to do it. I am sure there is a better way to do it. I don't like having a Func which creates a WaitCallback that holds a lambda. Also, DynamicInvoke is pretty slow compared to calling a delegate directly. I doubt this way of processing the event is any faster than just doing it sequentially.

My question is: How can I process an event in a parallel manner, preferably by using ThreadPool?

Since I usually work with Mono, .NET 4.0 or the Task Parallel Library are both not an option.

Thank you!

EDITS: - Corrected example thanks to Earwickers answer. - Updated try-out code

+1  A: 

You appear to be doing the asynchronous launching twice in your code snippet.

First you call BeginInvoke on a delegate - this queues a work item so that the thread pool will execute the delegate.

Then inside that delegate, you use QueueUserWorkItem to... queue another work item so the thread pool will execute the real delegate.

This means that when you get back an IAsyncResult (and hence a wait handle) from the outer delegate, it will signal completion when the second work item has been queued, not when it has finished executing.

Daniel Earwicker
You are right, at first I did not have BeginInvoke there, it creeped in when playing around. I'll correct the example!
galaktor
+5  A: 

I'd go for an approach using DynamicMethod (LCG) and a state object which carries the arguments and does keep track of the calls (so that you can wait for them to complete).

Code: Something like this should do (not throughly tested yet though, may therefore throw some nasty exceptions in some situations):

/// <summary>
/// Class for dynamic parallel invoking of a MulticastDelegate.
/// (C) 2009 Arsène von Wyss, [email protected]
/// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used.
/// </summary>
public static class ParallelInvoke {
 private class ParallelInvokeContext<TDelegate> where TDelegate: class {
  private static readonly DynamicMethod invoker;
  private static readonly Type[] parameterTypes;

  static ParallelInvokeContext() {
   if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) {
    throw new InvalidOperationException("The TDelegate type must be a delegate");
   }
   Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()");
   Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()");
   Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()");
   FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext<TDelegate>).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic);
   Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls");
   FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext<TDelegate>).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic);
   Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments");
   MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public);
   Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName));
   if (delegate_invoke.ReturnType != typeof(void)) {
    throw new InvalidOperationException("The TDelegate delegate must not have a return value");
   }
   ParameterInfo[] parameters = delegate_invoke.GetParameters();
   parameterTypes = new Type[parameters.Length];
   invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext<TDelegate>), typeof(object)},
                               typeof(ParallelInvokeContext<TDelegate>), true);
   ILGenerator il = invoker.GetILGenerator();
   LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null;
   bool skipLoad = false;
   il.BeginExceptionBlock();
   il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke
   if (args != null) {
    Debug.Assert(args.LocalIndex == 0);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
    il.Emit(OpCodes.Dup);
    il.Emit(OpCodes.Stloc_0);
    skipLoad = true;
   }
   foreach (ParameterInfo parameter in parameters) {
    if (parameter.ParameterType.IsByRef) {
     throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters");
    }
    parameterTypes[parameter.Position] = parameter.ParameterType;
    if (args == null) {
     il.Emit(OpCodes.Ldarg_0);
     il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
    } else if (skipLoad) {
     skipLoad = false;
    } else {
     il.Emit(OpCodes.Ldloc_0);
    }
    il.Emit(OpCodes.Ldc_I4, parameter.Position);
    il.Emit(OpCodes.Ldelem_Ref);
    if (parameter.ParameterType.IsValueType) {
     il.Emit(OpCodes.Unbox_Any, parameter.ParameterType);
    }
   }
   il.Emit(OpCodes.Callvirt, delegate_invoke);
   il.BeginFinallyBlock();
   il.Emit(OpCodes.Ldarg_0);
   il.Emit(OpCodes.Call, monitor_enter);
   il.Emit(OpCodes.Ldarg_0);
   il.Emit(OpCodes.Dup);
   il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls);
   il.Emit(OpCodes.Ldc_I4_1);
   il.Emit(OpCodes.Sub);
   il.Emit(OpCodes.Dup);
   Label noPulse = il.DefineLabel();
   il.Emit(OpCodes.Brtrue, noPulse);
   il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
   il.Emit(OpCodes.Ldarg_0);
   il.Emit(OpCodes.Call, monitor_pulse);
   Label exit = il.DefineLabel();
   il.Emit(OpCodes.Br, exit);
   il.MarkLabel(noPulse);
   il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
   il.MarkLabel(exit);
   il.Emit(OpCodes.Ldarg_0);
   il.Emit(OpCodes.Call, monitor_exit);
   il.EndExceptionBlock();
   il.Emit(OpCodes.Ret);
  }

  [Conditional("DEBUG")]
  private static void VerifyArgumentsDebug(object[] args) {
   for (int i = 0; i < parameterTypes.Length; i++) {
    if (args[i] == null) {
     if (parameterTypes[i].IsValueType) {
      throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i));
     }
    } else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) {
     throw new ArgumentException(string.Format("The parameter {0} is not compatible", i));
    }
   }
  }

  private readonly object[] arguments;
  private readonly WaitCallback invokeCallback;
  private int activeCalls;

  public ParallelInvokeContext(object[] args) {
   if (parameterTypes.Length > 0) {
    if (args == null) {
     throw new ArgumentNullException("args");
    }
    if (args.Length != parameterTypes.Length) {
     throw new ArgumentException("The parameter count does not match");
    }
    VerifyArgumentsDebug(args);
    arguments = args;
   } else if ((args != null) && (args.Length > 0)) {
    throw new ArgumentException("This delegate does not expect any parameters");
   }
   invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this);
  }

  public void QueueInvoke(Delegate @delegate) {
   Debug.Assert(@delegate is TDelegate);
   activeCalls++;
   ThreadPool.QueueUserWorkItem(invokeCallback, @delegate);
  }
 }

 private static readonly MethodInfo monitor_enter;
 private static readonly MethodInfo monitor_exit;
 private static readonly MethodInfo monitor_pulse;

 static ParallelInvoke() {
  monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
  monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
  monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
 }

 public static void Invoke<TDelegate>(TDelegate @delegate) where TDelegate: class {
  Invoke(@delegate, null);
 }

 public static void Invoke<TDelegate>(TDelegate @delegate, params object[] args) where TDelegate: class {
  if (@delegate == null) {
   throw new ArgumentNullException("delegate");
  }
  ParallelInvokeContext<TDelegate> context = new ParallelInvokeContext<TDelegate>(args);
  lock (context) {
   foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) {
    context.QueueInvoke(invocationDelegate);
   }
   Monitor.Wait(context);
  }
 }
}

Usage:

ParallelInvoke.Invoke(yourDelegate, arguments);

Notes:

  • Exceptions in the event handlers are not handled (but the IL code has a finally to decrement the counter, so that the method sould end correctly) and this could cause trouble. It would be possible to catch and transfer the exceptions in the IL code as well.

  • Implicit conversions other than inheritance (such as int to double) are not performed and will throw an exception.

  • The synchronization technique used does not allocate OS wait handles, which is usually good for performance. A description of the Monitor workings can be found on Joseph Albahari's page.

  • After some performance testing, it seems that this approach scales much better than any approach using the "native" BeginInvoke/EndInvoke calls on delegates (at least in the MS CLR).

Lucero
That wold be very interesting, looking forward to your sample.
galaktor
Posted some code, but I only did a quick test. Seems to work though.
Lucero
Thank you for the code! It seems to work, and gladly it is much easier to use than it is to read ;-) I ran a small performance test comparing your code with direct invocation of the event and then with the method I posted in the question. Results were...ParallelInvoke: 86ms, Direct invoke: 21ms, My attempt: 36ms. After serveral runs the proportions seem to stay the same. Would there be anything obviously wrong with keeping my version? I can provide my test code if anyone is curious.
galaktor
My code needs a warm-up (the two static constructors, and the first call to the dynamically generated method) in order to perform well, and it is a completely generic solution working with any delegate and any number of arguments, both value- and reference types (no byref types though because this would't really make sense). I can think of no real reason for it being slower than any DynamicInvoke solution, but this may also be depending on the framework used (Mono vs. MS CLR). So if you can show me some of your code I'd be interested in investigating the cause for the slow performance.
Lucero
Okay, so I did some tests on my own. Even a direct call to the BeginInvoke/EndInvoke of the delegates in the invocation list uses twice as much time as my code (not counting one warmup call), and this is without any further optimization on my code and also without DynamicInvoke or additional "proxy" methods (your Func<,>). I'll post another answer with this code so that you can compare it yourself.
Lucero
Thanks. I'll run your test code when I get the chance. My benchmark did not warm up your code, so I'll give it another try, too. I'll get back to you afterwards :-)
galaktor
I posted a new version, which should be significantly faster (especially as release build - the debug build does more checks which are not absolutely required).
Lucero
+3  A: 

If the type of the delegate is known, you can directly call their BeginInvoke ans store the IAsyncResults in an array to wait and end the calls. Note that you should call EndInvoke in order to avoid potential resource leaks. The code relies on the fact that EndInvoke waits until the call is finished, so that no WaitAll is required (and, mind you, WaitAll has several issues so that I'd avoid its use).

Here is a code sample, which is at the same time a simplistic benchmark for the different approaches:

public static class MainClass {
 private delegate void TestDelegate(string x);

 private static void A(string x) {}

 private static void Invoke(TestDelegate test, string s) {
  Delegate[] delegates = test.GetInvocationList();
  IAsyncResult[] results = new IAsyncResult[delegates.Length];
  for (int i = 0; i < delegates.Length; i++) {
   results[i] = ((TestDelegate)delegates[i]).BeginInvoke("string", null, null);
  }
  for (int i = 0; i < delegates.Length; i++) {
   ((TestDelegate)delegates[i]).EndInvoke(results[i]);
  }
 }

 public static void Main(string[] args) {
  Console.WriteLine("Warm-up call");
  TestDelegate test = A;
  test += A;
  test += A;
  test += A;
  test += A;
  test += A;
  test += A;
  test += A;
  test += A;
  test += A; // 10 times in the invocation list
  ParallelInvoke.Invoke(test, "string"); // warm-up
  Stopwatch sw = new Stopwatch();
  GC.Collect();
  GC.WaitForPendingFinalizers();
  Console.WriteLine("Profiling calls");
  sw.Start();
  for (int i = 0; i < 100000; i++) {
   // ParallelInvoke.Invoke(test, "string"); // profiling ParallelInvoke
   Invoke(test, "string"); // profiling native BeginInvoke/EndInvoke
  }
  sw.Stop();
  Console.WriteLine("Done in {0} ms", sw.ElapsedMilliseconds);
  Console.ReadKey(true);
 }
}

On my very old laptop this takes 95553 ms with BeginInvoke/EndInvoke vs. 9038 ms with my ParallelInvoke approach (MS .NET 3.5). So this approach scales not well compared to the ParallelInvoke solution.

Lucero
For explanations on the performance issue with BeginInvoke/EndInvoke, see: http://stackoverflow.com/questions/532791/whats-the-difference-between-queueuserworkitem-and-begininvoke-for-performi
Lucero
Thanks for pointing that out - I would have dismissed the WaitAll approach anyways due to it's limitations! As I already stated, I will give your test code a try as soon as I have the chance.
galaktor
I ran my code with a warm-up and a longer test run for better comparison. With 100,000 runs each, ParallelInvoke took ~16seconds (15948ms) while Begin/EndInvoke ran for ~19 secs (19149ms). So your solution seems to be much faster which is why I am marking your ParallelInvoke code as the right answer. Thanks again.
galaktor
You're welcome. On what runtime did you run your tests, was that Mono? Because in my case Begin/EndInvoke performed much worse. Also, please note that I did a small edit of ParallelInvoke and introduced the invokeCallback field, which makes it perform better in the case of multiple events in the invocation list. So if you rand the test with the old code, maybe make this change as well to get even better performance.
Lucero
No, I ran it on MS.NET. I have been noticing that my unit tests on this code seem to run much faster when run under Mono compared to MS.NET.
galaktor
How many delegates did you have in the invocation list? The difference was becoming much bigger in my tests when more items were added to the invocation list.
Lucero
I added 100,000 delegates to the event's list and then invoked the event in 3 different ways: ParallelInvoke,Begin/EndInvoke and directly invoking the event the standard way.
galaktor
Ah, interesting, but the bottleneck (or the part using most of the banchmark time) in this case was the thread pool and not the calls themselfes. I suggest that you also try with 10-20 events in the list and issue a bunch of calls 100'000 is fine) of those, as this will not stress the thread pool as much but rather the async call "frameworks" themselfes.
Lucero
Now I had 100,000 calls of the event holding 20 delegates. Results do not differ that much...ParallelInvoke: 33478ms, Begin/EndInvoke: 43797ms, Direct invoke: 95ms. Your code is still obviously faster, at least on my Core2Duo Laptop.
galaktor
Thanks for the information, so the bigger difference on my computer is maybe due to the older CPU (cache size or whatever), I tested it with a Pentium M 1.5GHz (from the good old days back in 2003).
Lucero
I run the same test on Mono for Windows 6 times, here are the intersting average results...ParallelInvoke: 66309.2ms, Direct invoke: 162.167ms, Begin/EndInvoke: 24741.7ms. Apparantly the code behind Begin/EndInvoke is implemented much more efficient on Mono than it is on MS.NET, ParallelInvoke gets much worse results, though.
galaktor
So I guess that Mono doesn't rely on the remoting architecture for Begin/EndInvoke then. On the other side, maybe the DynamicMethods are not as efficient in Mono (or maybe just the Unbox_Any IL instruction), I don't see much other places where so much time could be lost. Still, I believe that my code is still a showcase how to create invocation proxy methods which take a object[] just as DynamicInvoke but call the method using compiled IL code, thus being more efficient. ;)
Lucero
@galaktor, if you *like* you can send me a mail for further discussion. Mail address is in the source.
Lucero
A: 

Are you doing this for performance?

That will only work if it allows you to get multiple pieces of hardware working in parallel, and it will cost you in process-switch overhead.

Mike Dunlavey
It was just an experiment I was curious about. Performance tests did show that both parallel approaches shown here require several hundreds more time than the standard serial approach.
galaktor