A while back I put together a simple class named Actor
that was my implementation of the Actor Model. Since then I've used it with great success (Minus some annoying workarounds for the lack of a discriminated union type.). I am left with an issue that I am unsure of how to resolve without making the class clunky and slow.
When someone defines a message they are of course within their right to include a reference to an object that the caller themselves can manipulate. Even with the knowledge that I will pretty much be the only person using this class, this still bothers me.
A good example of a way around this is with Web Workers as implemented in Firefox. When passing an object to a worker it is serialized to JSON.
Any ideas?
public abstract class Actor<T, U> : IDisposable
{
private const int AsyncChannelPoolSize = 20;
private volatile bool _disposed;
private readonly Thread _actorThread;
private readonly AsyncReplyChannel<T, U> _messageChannel;
private readonly Lazy<ObjectPool<AsyncChannel<U>>> _asyncChannelPool;
public event EventHandler<ExceptionEventArgs> Exception;
protected Actor()
{
_messageChannel = new AsyncReplyChannel<T, U>();
_asyncChannelPool = new Lazy<ObjectPool<AsyncChannel<U>>>(() => new ObjectPool<AsyncChannel<U>>(AsyncChannelPoolSize));
_actorThread = new Thread(ProcessMessages);
_actorThread.IsBackground = true;
_actorThread.Start();
}
public U PostWithReply(T value)
{
ThrowIfDisposed();
var replyChannel = default(AsyncChannel<U>);
var replyPackage = default(AsyncReplyPackage<T, U>);
var replyMessage = default(U);
try
{
replyChannel = _asyncChannelPool.Value.Get();
replyPackage = new AsyncReplyPackage<T, U>(value, replyChannel);
_messageChannel.Send(replyPackage);
replyMessage = replyChannel.Receive();
}
finally
{
_asyncChannelPool.Value.Put(replyChannel);
}
return replyMessage;
}
public void PostWithAsyncReply(T value, IAsyncChannel<U> replyChannel)
{
ThrowIfDisposed();
_messageChannel.Send(new AsyncReplyPackage<T, U>(value, replyChannel));
}
public void Dispose()
{
Dispose(true);
}
protected abstract void ProcessMessage(AsyncReplyPackage<T, U> package);
protected virtual void OnException(Exception ex)
{
var exceptionEvent = Exception;
if (exceptionEvent != null)
{
exceptionEvent(this, new ExceptionEventArgs(ex));
}
}
protected virtual void Dispose(bool disposing)
{
_disposed = true;
_messageChannel.Dispose();
if (_asyncChannelPool.IsValueCreated)
{
_asyncChannelPool.Value.Dispose();
}
}
private void ProcessMessages()
{
var package = default(AsyncReplyPackage<T, U>);
while (_messageChannel.TryReceive(out package) && !_disposed)
{
try
{
ProcessMessage(package);
}
catch (Exception ex)
{
OnException(ex);
}
}
}
private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
}
}