views:

431

answers:

5

I've created a new class called Actor which processes messages passed to it. The problem I am running into is figuring out what is the most elegant way to pass related but different messages to the Actor. My first idea is to use inheritance but it seems so bloated but it is strongly types which is a definite requirement.

Have any ideas?

Example

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Actor Class

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}
+2  A: 

A long shot, but anyway..

I am assuming that discriminated-union is F# for ADT (Abstract Data Type). Which means the type could be one of several things.

In case there are two, you could try and put it in a simple generic class with two type parameters:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

To make it work for 3 or more, we need to replicate this class a number of times. Any one has a solution for function overloading depending on the runtime type?

jdv
+2  A: 

If you have this

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

in F#, then the C# equivalent of the CLR generated for class Either<'a, 'b> has inner types like

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

each with a tag, a getter and a factory method

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

plus a discriminator

internal int  Tag { get; }

and a raft of methods to implement interfaces IStructuralEquatable, IComparable, IStructuralComparable

Steve Gilham
For info, this exists internally inside the reactive framework. You can crack it open with Reflector and see how they do it.
Benjol
+3  A: 

Union types and pattern matching map pretty directly to the visitor pattern, I've posted about this a few times before:

So if you want to pass messages with lots of different types, you're stuck implementing the visitor pattern.

(Warning, untested code ahead, but should give you an idea of how its done)

Let's say we have something like this:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

You end up with this bulky C# code:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Now whenever you want to do something with the message, you need to implement a visitor:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Then finally you can use it like this:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Yes, its as obtuse as it seems, but that's how you pass multiple messages a class in a type-safe manner, and that's also why we don't implement unions in C# ;)

Juliet
I think it's pretty rare in C# to define a visitor over a type which isn't tree-like. A simple variation of Steve's example of what the compiler outputs could be more concise without being unidiomatic.
kvb
Actually, on further review I really like the safety of your method; it's just a bit verbose. I've updated my answer with a variation that more closely mimics F# usage by essentially inlining your types.
kvb
+5  A: 

Steve Gilham summarized how the compiler actually handles discriminated unions. For your own code, you could consider a simplified version of that. Given the following F#:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Here's one way to emulate it in C#:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Now, in code that is given a QueueMessage, you can switch on the MessageType property in lieu of pattern matching, and make sure that you access the Item property only on EnqueueMessages.

EDIT

Here's another alternative, based on Juliet's code. I've tried to streamline things so that it's got a more usable interface from C#, though. This is preferable to the previous version in that you can't get a MethodNotImplemented exception.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

You'd use this code like this:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                () => -1,
                () => -2,
                s => s.Length);
    }
}
kvb
+1: although it exposes non-implemented methods on the other message types, its simple and isn't a threat to sanity like the full-blown visitor pattern ;)
Juliet
+2  A: 

In your example code, you implement PostWithAsyncReply in terms of PostWithReply. That isn't ideal, because it means that when you call PostWithAsyncReply and the actor takes a while to handle it, there are actually two threads tied up: the one executing the actor and the one waiting for it to finish. It would be better to have the one thread executing the actor and then calling the callback in the asynchronous case. (Obviously in the synchronous case there's no avoiding the tying up of two threads).

Update:

More on the above: you construct an actor with an argument telling it how many threads to run. For simplicity suppose every actor runs with one thread (actually quite a good situation because actors can then have internal state with no locking on it, as only one thread accesses it directly).

Actor A calls actor B, waiting for a response. In order to handle the request, actor B needs to call actor C. So now A and B's only threads are waiting, and C's is the only one actually giving the CPU any work to do. So much for multi-threading! But this is what you get if you wait for answers all the time.

Okay, you could increase the number of threads you start in each actor. But you'd be starting them so they could sit around doing nothing. A stack uses up a lot of memory, and context switching can be expensive.

So it's better to send messages asynchronously, with a callback mechanism so you can pick up the finished result. The problem with your implementation is that you grab another thread from the thread pool, purely to sit around and wait. So you basically apply the workaround of increasing the number of threads. You allocate a thread to the task of never running.

It would be better to implement PostWithReply in terms of PostWithAsyncReply, i.e. the opposite way round. The asynchronous version is low-level. Building on my delegate-based example (because it involves less typing of code!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

So the private implementation returns a bool. The public asynchronous method accepts an action that will receive the return value; both the private implementation and the callback action are executed on the same thread.

Hopefully the need to wait synchronously is going to be the minority case. But when you need it, it could be supplied by a helper method, totally general purpose and not tied to any specific actor or message type:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

So now in some other actor we can say:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

The type argument to Wait may be unnecessary, haven't tried compiling any of this. But Wait basically magics-up a callback for you, so you can pass it to some asynchronous method, and on the outside you just get back whatever was passed to the callback as your return value. Note that the lambda you pass to Wait still actually executes on the same thread that called Wait.

We now return you to our regular program...

As for the actual problem you asked about, you send a message to an actor to get it to do something. Delegates are helpful here. They let you effectively get the compiler to generate you a class with some data, a constructor that you don't even have to call explicitly and also a method. If you're having to write a bunch of little classes, switch to delegates.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Now a specific derived actor follows this pattern:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

So to send a message to the actor, you just call the public methods. The private Impl method does the real work. No need to write a bunch of message classes by hand.

Obviously I've left out the stuff about replying, but that can all be done with more parameters. (See update above).

Daniel Earwicker
Can you explain why this is bad practice with a little more detail? Oh and I had this idea in the back of my mind but your example really shows the elegance of it.`ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));`
ChaosPandion
Oh I get it now. What I can do is add another Message implementation which I can add in the callback as a property. Thanks checking out the code for me.
ChaosPandion
Not sure I understand your second comment. But anyway, I wrote a big update!
Daniel Earwicker
Communication is the hardest part of what we do. That being said the important thing is that I understand what you mean now (Thanks for the awesome update).
ChaosPandion
This was a hard decision because each idea had its merits. I think the elegance of using delegates puts it ahead.
ChaosPandion
Cool! I recommend looking at JavaScript/AJAX examples for ideas in this area. Everything is asynchronous, callbacks, anonymous functions, etc. Not multi-threaded but similar patterns to the above.
Daniel Earwicker