views:

114

answers:

3

EDIT: It kind of occurred to me too late (?) that all the code I posted in my first update to this question was way too much for most readers. I've actually gone ahead and written a blog post about this topic for anyone who cares to read it.

In the meantime, I've left the original question in place, to give a brief glimpse at the problem I'd like to solve.

I'll also just note that the code I have posted (on my blog) has, thus far, stood up pretty well to testing. But I'm still interested in any and all feedback people are willing to give me on how clean/robust/performant* it is.

*I love how that word doesn't really mean what we think, but we developers use it all the time anyway.


Original Question

Sorry for the vague question title -- not sure how to encapsulate what I'm asking below succinctly. (If someone with editing privileges can think of a more descriptive title, feel free to change it.)

The behavior I need is this. I am envisioning a worker class that accepts a single delegate task in its constructor (for simplicity, I would make it immutable -- no more tasks can be added after instantiation). I'll call this task T. The class should have a simple method, something like GetToWork, that will exhibit this behavior:

  1. If the worker is not currently running T, then it will start doing so right now.
  2. If the worker is currently running T, then once it is finished, it will start T again immediately.
  3. GetToWork can be called any number of times while the worker is running T; the simple rule is that, during any execution of T, if GetToWork was called at least once, T will run again upon completion (and then if GetToWork is called while T is running that time, it will repeat itself again, etc.).

Now, this is pretty straightforward with a boolean switch. But this class needs to be thread-safe, by which I mean, steps 1 and 2 above need to comprise atomic operations (at least I think they do).

There is an added layer of complexity. I have need of a "worker chain" class that will consist of many of these workers linked together. As soon as the first worker completes, it essentially calls GetToWork on the worker after it; meanwhile, if its own GetToWork has been called, it restarts itself as well. Logically calling GetToWork on the chain is essentially the same as calling GetToWork on the first worker in the chain (I would fully intend that the chain's workers not be publicly accessible).

One way to imagine how this hypothetical "worker chain" would behave is by comparing it to a team in a relay race. Suppose there are four runners, W1 through W4, and let the chain be called C. If I call C.StartWork(), what should happen is this:

  1. If W1 is at his starting point (i.e., doing nothing), he will start running towards W2.
  2. If W1 is already running towards W2 (i.e., executing his task), then once he reaches W2, he will signal to W2 to get started, immediately return to his starting point and, since StartWork has been called, start running towards W2 again.
  3. When W1 reaches W2's starting point, he'll immediately return to his own starting point.
    1. If W2 is just sitting around, he'll start running immediately towards W3.
    2. If W2 is already off running towards W3, then W2 will simply go again once he's reached W3 and returned to his starting point.

The above is probably a little convoluted and written out poorly. But hopefully you get the basic idea. Obviously, these workers will be running on their own threads.

Also, I guess it's possible this functionality already exists somewhere? If that's the case, definitely let me know!

+1  A: 

Use semaphores. Each worker is a thread with the following code (pseudocode):

WHILE(TRUE)
    WAIT_FOR_SEMAPHORE(WORKER_ID) //The semaphore for the current worker
    RESET_SEMAPHORE(WORKER_ID)
    /* DO WORK */
    POST_SEMAPHORE(NEXT_WORKER_ID) //The semaphore for the next worker
END

A non-zero semaphore means that someone signaled the current thread to do the work. After gets a non zero semaphore in its entry line, it resets the semaphore (mark as no one signaled), do the work (meanwhile the semaphore can be posted again) and post the semaphore for the next worker. The story repeats in the next worker(s).

Victor Hurdugaci
I have much to learn about using semaphores. Thanks for the advice -- I am looking into them now.
Dan Tao
+1  A: 

A naive implementation that you may get some mileage from.

Note:

It is my understanding that scalar types, r.e. the bool flags controlling execution, have atomic assignment making them as thread safe as you would need/want in this scenario.

There are much more complex possibilities involving semaphores and other strategies, but if simple works....

using System;
using System.Threading;

namespace FlaggedWorkerChain
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            FlaggedChainedWorker innerWorker = new FlaggedChainedWorker("inner", () => Thread.Sleep(1000), null);
            FlaggedChainedWorker outerWorker = new FlaggedChainedWorker("outer", () => Thread.Sleep(500), innerWorker);

            Thread t = new Thread(outerWorker.GetToWork);
            t.Start();

            // flag outer to do work again
            outerWorker.GetToWork();

            Console.WriteLine("press the any key");
            Console.ReadKey();
        }
    }

    public sealed class FlaggedChainedWorker
    {
        private readonly string _id;
        private readonly FlaggedChainedWorker _innerWorker;
        private readonly Action _work;
        private bool _busy;
        private bool _flagged;

        public FlaggedChainedWorker(string id, Action work, FlaggedChainedWorker innerWorker)
        {
            _id = id;
            _work = work;
            _innerWorker = innerWorker;
        }

        public void GetToWork()
        {
            if (_busy)
            {
                _flagged = true;
                return;
            }

            do
            {
                _flagged = false;
                _busy = true;
                Console.WriteLine(String.Format("{0} begin", _id));

                _work.Invoke();

                if (_innerWorker != null)
                {
                    _innerWorker.GetToWork();
                }
                Console.WriteLine(String.Format("{0} end", _id));

                _busy = false;
            } while (_flagged);
        }
    }
}
Sky Sanders
The basic idea of the two boolean flags is simple and seems pretty sound. I like it. There is a problem with this implementation for my purposes, however: a given `FlaggedChainedWorker` isn't going to report being free (i.e., `!_busy`) until *all* workers in the chain--its own `_innerWorker`, and *that* worker's `_innerWorker`, and so on--are finished. The first worker needs to be ready to run again while the second is going.
Dan Tao
I think `GetToWork` needs to just *start* the worker working, not call `Invoke` on the assigned task (which could be a long-running task). So I *think* `BeginInvoke` is the way to go. It's a kind of complicated scenario, though, so I might be missing something.
Dan Tao
@Dan - the implication I see is that there is to be no true relation between the nested tasks, e.g. start outer,outer starts inner, which takes twice as long, meanwhile outer has been flagged and executed twice but only resulting in one execution of inner. And this disparity between execution count will grow over the life of the worker. Does this describe your intended scenario? It is an unusual workflow, but is very easily implemented as above. Let me know. OR if upon reflection you decided that you actually do want a call to the outer DoWork should be atomic, well then... ;-)
Sky Sanders
@Dan - in regards to invoke vs begin invoke, the implementation is with the thread management external to the worker, what you seem to be describing in comments is a thread pool implementation of some sort. this is my understanding of what I am reading.
Sky Sanders
The disparity in execution count could indeed grow. What I'm describing is, I suppose, similar to a thread pool, but with a couple of important differences. First, each individual work item needs to be atomic in the sense that it does not overlap with itself (I *think* atomic is the right word?). If I have a thread pool with four threads available and I queue the same method to run four times, it will be running in four places concurrently. Second, the chain should represent a fixed sequence of methods--A followed by B followed by C--rather than a mutable queue.
Dan Tao
Another point worth mentioning about the atomicity of using boolean flags: it may be that setting *or* checking a `bool` is atomic; however, doing *both* (i.e., one followed by the other) is obviously *not* atomic, which means that this code could result in `GetToWork` running twice concurrently. (Scenario: thread 1 checks `if (_busy)` and, finding it `false`, moves forward; in the very next operation, thread 2 is *already* at `flagged = false;` -- it moves ahead to `_busy = true;` but thread 1 has already gotten through!)
Dan Tao
+1  A: 

Seems to me that you're overcomplicating this. I've written these "pipeline" classes before; all you need is a queue of workers each with a wait handle that gets signaled after the action is complete.

public class Pipeline : IDisposable
{
    private readonly IEnumerable<Stage> stages;

    public Pipeline(IEnumerable<Action> actions)
    {
        if (actions == null)
            throw new ArgumentNullException("actions");
        stages = actions.Select(a => new Stage(a)).ToList();
    }

    public Pipeline(params Action[] actions)
        : this(actions as IEnumerable<Action>)
    {
    }

    public void Dispose()
    {
        foreach (Stage stage in stages)
            stage.Dispose();
    }

    public void Start()
    {
        foreach (Stage currentStage in stages)
            currentStage.Execute();
    }

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        public void Execute()
        {
            readyEvent.WaitOne();
            action();
            readyEvent.Set();
        }
    }
}

And here's a test program, which you can use to verify that actions always get executed in the correct order and only one of the same action can ever execute at once:

class Program
{
    static void Main(string[] args)
    {
        Action firstAction = GetTestAction(1);
        Action secondAction = GetTestAction(2);
        Action thirdAction = GetTestAction(3);
        Pipeline pipeline = new Pipeline(firstAction, secondAction, thirdAction);
        for (int i = 0; i < 10; i++)
        {
            ThreadPool.QueueUserWorkItem(s => pipeline.Start());
        }
    }

    static Action GetTestAction(int index)
    {
        return () =>
        {
            Console.WriteLine("Action started: {0}", index);
            Thread.Sleep(100);
            Console.WriteLine("Action finished: {0}", index);
        };
    }
}

Short, simple, completely thread-safe.

If for some reason you need to start working at a specific step in the chain instead, then you can just add an overload for Start:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        currentStage.Execute();
}

Edit

Based on comments, I think a few minor changes to the inner Stage class should be enough to get the kind of behaviour you want. We just need to add a "queued" event in addition to the "ready" event.

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;
        private readonly EventWaitHandle queuedEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
            this.queuedEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        private bool CanExecute()
        {
            if (readyEvent.WaitOne(0, true))
                return true;
            if (!queuedEvent.WaitOne(0, true))
                return false;
            readyEvent.WaitOne();
            queuedEvent.Set();
            return true;
        }

        public bool Execute()
        {
            if (!CanExecute())
                return false;
            action();
            readyEvent.Set();
            return true;
        }
    }

Also change the pipeline's Start method to break if a stage can't execute (i.e. is already queued):

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        if (!currentStage.Execute())
            break;
}

The concept here is pretty simple, again:

  • A stage first tries to immediately acquire the ready state. If it succeeds, then it starts running.
  • If it fails to acquire the ready state (i.e. the task is already running), then it tries to acquire the queued state.
    • If it gets the queued state, then it waits for the ready state to become available and then releases the queued state.
    • If it can't get the queued state either, then it gives up.

I've read over your question and comments again and I'm pretty sure this is exactly what you're trying to do, and gives the best trade-off between safety, throughput, and throttling.

Because the ThreadPool can sometimes take a while to respond, you should up the delay in the test program to 1000 instead of 100 if you want to really see the "skips" happening.

Aaronaught
About overcomplicating the problem: definitely a possibility! I'm looking at this code right now and liking how you have it set up. One issue I'm seeing is that I want the `Start` method on your `Pipeline` class to return immediately. But then, obviously, I could achieve this by putting the call to `ThreadPool.QueueUserWorkItem` *inside* the `Start` method itself (or, as you've done, leave it to the caller whether they want the call to block or not).
Dan Tao
Actually, I just spotted another issue, and I'm curious to know what you would suggest to address it. (Aside from this, I really like what you have here and am inclined to prefer it to my implementation, since it's definitely simpler and cleaner.) The issue is that I want a method in the chain to only repeat itself *once* if it has been called multiple times while running.
Dan Tao
For instance in this `Pipeline` class if I call `ThreadPool.QueueUserWorkItem(s => pipeline.Start());` three times while the first action is executing, it will execute three more times upon completion (and this could easily cause the queue to grow to an absurdly large size, depending on the frequency of method calls). I would want it to only execute *one* more time. Thoughts?
Dan Tao
@Dan: That seems like kind of an arbitrary requirement to me. Why does a stage need to queue itself exactly once if it's busy, as opposed to zero times? The latter's much easier. If it's really possible for the queue to get out of control like this, I would probably just limit access to the pipeline with a `Semaphore`, so that only a limited number of threads can use it at the same time.
Aaronaught
Ha, zero would indeed be much easier. But here's the thing: this is for processing market data and running pricing algorithms. When considered in that light the requirement isn't really arbitrary. Suppose the pricing model is currently running, and a thousand market ticks come in all at once. This means that new data has arrived and we'll want to run the model again to produce up-to-date theoretical prices based on the latest data. We wouldn't want to simply *not* run the model again as then our theo would be stale. On the other hand, there'd be no need to run it a thousand more times!
Dan Tao
Of course, if, while running the pricing model the *second* time, a thousand *more* ticks come in, we'll want to repeat it *again* when it's finished. But, supposing that's the end of it (for the next several seconds, anyway), this should be the end result: three executions of the pricing model. As opposed to two thousand executions. Does that make sense?
Dan Tao
@Dan: I could see the complexity of this getting out of control if the pipeline is allowed to start at any stage. What happens if a request is made to start at stage 4, and then later on, before that one has started, a new request comes in for stage 2? Should the stage 4 request be discarded, even though it came in first?
Aaronaught
My concept for this class actually doesn't really require that any arbitrary stage be accessible from outside the chain/pipeline. That said, it also shouldn't be a show-stopper if some method is called independently at any particular time (e.g., if stage 4 actually points to some publicly accessible method and that method gets called from outside the chain/pipeline). The basic premise here is of a system that ensures methods get called exactly as often as necessary given certain triggering events (in my case, the receipt of incoming market data).
Dan Tao
Oh wow, I *just* noticed the edit you made with the addition of the `CanExecute` method. Looks pretty solid. It took me a second to understand how you did things because the name `queuedEvent` threw me off (wouldn't it be more accurate to call it `dequeuedEvent` or `queueOpenEvent` or something?). You and I approached the problem very differently; I suspect your idea performs better. I look forward to testing it out soon.
Dan Tao
@Dan: I fixed a minor oversight, the pipeline should stop if an action fails to execute.
Aaronaught
True, good call. I had missed that myself. Now here's something that occurred to me later: since your approach for making the `Start` method return immediately would be to leverage `ThreadPool.QueueUserWorkItem`, is the following scenario possible? Let's say that I call `Start` as many times as possible during the timespan in which the first method is executing. Really, the first method should execute only one more time. But if I've used `QueueUserWorkItem`, some of those calls might not come along until a few moments later, in which case it will execute yet once more. Is that true?
Dan Tao
...not that it would be a big deal, mind you! Just a question.
Dan Tao
@Dan: Technically, yes, but it would have to happen with microseconds of the stage finishing, which is very unlikely if these stages take several seconds or even milliseconds. And in that case, it's equally possible that the last `Start` call would have come in a few microseconds later in the first place, so IMO it's not really worth worrying about (and as far as I know, no design can prevent it anyway).
Aaronaught