views:

337

answers:

2

What is the preferred way, if any, to handle channel input in a prioritized fashion? Is there anything equivalent to Scala's "reactWithin(0) { ... case TIMEOUT }" construct?

A: 

I was looking at the basic examples on the Retlang home site and noticed this test:

[Test]
public void RequestReply()
{
    using (var fiber = new PoolFiber())
    {
        fiber.Start();
        var channel = new RequestReplyChannel<string, string>();
        channel.Subscribe(fiber, req => req.SendReply("bye"));
        var reply = channel.SendRequest("hello");

        string result;
        Assert.IsTrue(reply.Receive(10000, out result));
        Assert.AreEqual("bye", result);
    }
}

Looks likes a Question/Response/Timeout example.

Rick
This doesn't have anything to do with my question.
anthony
A: 

I wrote a Subscription class that delivers prioritized messages on a set interval. It's not an ideal general case way to consume prioritized messages, but I'll post it for posterity. I think a custom RequestReplyChannel would be a better option for certain other cases. Implementation of PriorityQueue is left as an exercise to the reader.

class PrioritySubscriber<T> : BaseSubscription<T>
{
    private readonly PriorityQueue<T> queue;
    private readonly IScheduler scheduler;
    private readonly Action<T> receive;
    private readonly int interval;

    private readonly object sync = new object();
    private ITimerControl next = null;

    public PrioritySubscriber(IComparer<T> comparer, IScheduler scheduler,
        Action<T> receive, int interval)
    {
        this.queue = new PriorityQueue<T>(comparer);
        this.scheduler = scheduler;
        this.receive = receive;
        this.interval = interval;
    }

    protected override void OnMessageOnProducerThread(T msg)
    {
        lock (this.sync)
        {
            this.queue.Enqueue(msg);

            if (this.next == null)
            {
                this.next =
                    this.scheduler.Schedule(this.Receive, this.interval);
            }
        }
    }

    private void Receive()
    {
        T msg;

        lock (this.sync)
        {
            msg = this.queue.Dequeue();

            if (this.queue.Count > 0)
            {
                this.next =
                    this.scheduler.Schedule(this.Receive, this.interval);
            }
        }

        this.receive(msg);
    }
}
anthony
Instead of writing your own you could consider using Wintellect's Power Collections for .NET (OrderedBag<T>), you can find these via CodePlex.
Rick