views:

404

answers:

5

I have an application that is based on a message-pump thread-pool archtecture. Whenever there is an action that could block, it is implemented as "callback on complete/trigger evnet" action, so it won't stall the executing thread.

While this techniqiue is appropriate for most cases, there are situations when it becomes very inconvinient and over-complicates the code.

What I'd like to be able to do is, to keep processing events while waiting, in a transparent way, without breaking the function up into pre/post waiting parts.

How should I do this?

I had two options in mind:

  1. Run the message loop from within the executing function while waiting.
  2. Create a new working thread while waiting, and terminating it (in a proper way) when resuming.

Both options have their flaws, to name a few:

For 1:

  • Could potentially result in stack overflow.
  • Could potentially end up dead-locked.
  • If the inner message results in waiting for a second event to complete, and the outer event completes in the meanwhile, the outer function can't continue until the second event completes, and this situation can expand.

Option 2 can simply end up in creating more and more threads.

Ofcourse, there might be other options that I haven't thought of.

EDIT: Language is C++, so functions can't be stepped out of and into in an easy (portable?) manner. Platform is Windows (API), although I don't think it's relevant.

A: 

EDIT: You mention not wanting to "breaking the function up into pre/post waiting parts."

What language are you developing in? If it has continuations (yield return in C#) then that provides a way to write code that appears to be procedural but which can easily be paused until a blocking operation makes its completion callback.

Here's an article about the idea: http://msdn.microsoft.com/en-us/magazine/cc546608.aspx

UPDATE:

Unfortunatly, the language is C++

That would make a great T-shirt slogan.

Okay, so you might find it helpful to structure your sequential code as a state-machine, so it becomes interrupt/resume-capable.

e.g. your pain is needing to write two functions, the one that initiates and the one that acts as the handler for the completion event:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;
    begin_sending_string_somehow(msg, greeting_sent_okay);
}

void greeting_sent_okay()
{
    std::cout << "Greeting has been sent successfully." << std::endl;
}

Your idea was to wait:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;

    waiter w;
    begin_sending_string_somehow(msg, w);
    w.wait_for_completion();

    std::cout << "Greeting has been sent successfully." << std::endl;
}

In that example, waiter overloads operator() so it can serve as a callback, and wait_for_completion somehow hangs up until it sees that the operator() has been called.

I'm assuming that begin_sending_string_somehow's second parameter is a template parameter that can be any callable type accepting no parameters.

But as you say, this has drawbacks. Any time a thread is waiting like that, you've added another potential deadlock, and you are also consuming the "resource" of a whole thread and its stack, meaning that more threads will have to be created elsewhere to allow work to be done, which is contradictory to the whole point of a thread pool.

So instead, write a class:

class send_greeting
{
    int state_;
    std::string msg_;

public:
    send_greeting(const std::string &msg)
        : state_(0), msg_(msg) {}

    void operator()
    {
        switch (state_++)
        {
            case 0:
                std::cout << "Sending the greeting" << std::endl;
                begin_sending_string_somehow(msg, *this);
                break;

            case 1:
                std::cout << "Greeting has been sent successfully." 
                          << std::endl;
                break;
        }
    }
};

The class implements the function call operator (). Each time it is called, it executes the next step in the logic. (Of course, being such a trivial example, this now is mostly state management noise, but in a more complex example with four or five states it may help clarify the sequential nature of the code).

Problems:

  • If the event callback function signature has special parameters, you'll need to add another overload of operator() that stores the parameters in extra fields and then calls onto the parameterless overload. Then it starts to get messy because those fields will be accessible at compile-time in the initial state, even though they are not meaningful at runtime in that state.

  • How do objects of the class get constructed and deleted? The object has to survive until the operation completes or is abandoned... the central pitfall of C++. I'd recommend implementing a general scheme to manage it. Create a list of "things that will need to be deleted" and ensure that this happens automatically at certain safe points, i.e. try to get as close as possible to GC as you can. The further away you are from that, the more memory you will leak.

Daniel Earwicker
Unfortunatly, the language is C++ :P
Thanks for the detailed answer. That's an interesting technique, however I'm not sure that it fits with my specific problem. See, one of the requirements, as I mentioned, is transparency, so send_greeting would have to look like that (pseudo): send_grt(msg) { ... ; send_string(msg); cout<<"message sent";}, so the waiting part is inside send_string, and not send_greeting. Your solution works if the code is written as a state machine at the bottom level, while I'm looking for a top level solution. Great idea, anyway, and I'm sure I'll be using it elsewhere.
But then you're just dreaming about the impossible. If you *require* it to look like an ordinary synchronous function to the caller, then you *require* the waiting technique exactly as you originally described it, or as shown in my example with the hypothetical "waiter" class. That's pretty much all there is to it.
Daniel Earwicker
A: 

Without knowing more about your specific application (ie how long messages take to process etc..) there will be lots of handwaving:

  • Is this managed or unmanaged C++?

  • Which ThreadPool are you using?

    • QueueUserWorkItem?
    • Your own pool via CreateIoCompletionPort?
    • Or Vista's SubmitThreadpoolWork?

I think platform is somewhat relevant as the nature of the Thread Pool is important.

For example:

If you use (Completion Ports) for your thread pool (ie CreateIoCompletionPort). You have some control on how many threads run concurrently (and hence on how many total threads are ultimately created). If you set the maximum number of concurrent threads to say 4. Windows will will attempt to only allow 4 threads to run concurrently. If all 4 threads are busy processing and you queue a 5th item then windows will not allow that item to run until one of the 4 if finished (reusing the thread). The only time this rule is broken is when threads are blocked (ie waiting on I/O), then more threads are allowed to run.

This is the important thing to understand about Completion Ports, and why platform is relevant. It is very difficult to implement something like this without involving the Kernel. Knowing the difference between busy threads and blocked threads requires access to Thread states. Completion ports are very efficient with respect to number of context switches into the Kernel too.

Back to your question:

It would seem that you should have one thread to process/dispatch the messages and the message processing is all handled by pushing workers onto a thread pool. Let Completion ports handle the load balancing and concurrency. You message processing loop will never block and can continue to process messages.

If the rate of incoming messages far exceeds your ability to process them then you will probably have to pay attention to your queue size and block when it grows too large.

GrendleM
The thread pool is using it's own queue and schelduing mechanism, and C++ is unmanaged. However, I guess it could be converted to completion ports. I must admit that I have very few experience with completion ports, and it seems that they're somewhat under-documented, so if you could suggest a good background link, that would be helpful (MSDN did a somewhat sloppy job for once).I didn't entrily get your suggestion, though. If I have only one thread to dispatch the port and spread the work, then completion ports actually doesn't handle the balancing for me.----->
<----- From what I know of CP, I can have more threads in the pool than the concurrency, so blocked threads will be compenstated by the extra threads. The question is how can I dynamically extend and shrink the pool so ultimately there will always be threads ready for action?
If you keep a count of how many threads are currently "used" you can extend the number of threads. Basically when GetQueuedCompletionStatus returns, increment a counter, dispatch your processing function and then when it returns decrement the counter. If the counter exceeds the number of threads in your pool add one. The idea is to always keep at least on additional thread in the pool waiting on GetQueuedCompletionStatus. Shrinking the pool is another story. You'll have to decide on how many extra threads are too many. You can use age or number to determine when to destroy excess threads.
GrendleM
The thread balancing is basically handled by the concurrency value. Windows will try really hard to make sure that only that many threads are executing at the same time. Threads blocked on I/O are merely consuming memory. One thing to note is you will want to control the maximum stack size of the threads in your pool. By default threads allocate 4K of physical memory but 1MB of Virtual Memory. 2000 threads will exhaust all of the Virtual Memory on a 32 bit OS (practically speaking, you'll hit this limit much earlier than 2000 because of other things in VM).
GrendleM
A: 

It seems your problem is fundamental, and not related to C++. Other languages are perhaps better in hiding the stack usage, but as long as you haven't returned from Foo() you need the call stack for Foo(). And if you're also executing Bar(), that too needs a callstack.

Threads are an excellent approach to this, as each thread comes with its own callstack. Continuations are a smart but complicated way to save callstacks, so where available those are an option too. But if you don't want those, you'll have to make do with one callstack.

Daling with one callstack requires addressing reentrancy. Here, there's no generic answer on what's possible. In general, you will have a set of Messages M1..Mx which are handled by functions F1...Fy, with some application-specific and possibly state-dependent mapping. With a reentrant message loop, you might be executing Fi when you receive Mj. Now the problem is what to do. Not all functions F1...Fn may be callable; in particular Fi itself may not be callable. However, some other functions may also be unavailable, e.g. because they share resources. This is application-dependent.

If the processing of Mj requires any of these unavailable functions, you have to postpone it. Can you accept the next message in the queue? Again, that's implemenation dependent, and it may even relate to the message type and content. If the messages are sufficiently independent, it's possible to execute them out of order. This quickly becomes rather complex - to determine whether it's possible to accept the Nth message in the queue, you have to check if it can be executed out-of-order with respect to the preceding N-1 messages.

A language can help you by not hiding dependencies, but in the end you must make the explicit decisions. There's no silver bullet.

MSalters
+1  A: 

For portable C++ this won't do, but since you've mentioned your platform is Windows, why not use MsgWaitForMultipleObjects? Its purpose is to let you do exactly what your question says - keep pumping messages while waiting.

eran
A: 

Your problem is synchronizing the threads right? If that is your problem, why not use a mutex? It could be wrapped up with an interface. In fact you could use the PIMPL idiom to make the mutex portable.

http://msdn.microsoft.com/en-us/library/system.threading.mutex(VS.71).aspx

Partial