views:

118

answers:

5

Hello,

I'm doing some experiments on C++ multithreading and I have no idea how to solve one problem. Let's say we have thread pool, that process user requests using existing thread and creates new thread, when no free thread available. I've created command_queue thread-safe class, which have push and pop methods. pop waits while queue is empty and returns only when command is available or timeout occurred. Now it's time to implement thread pool. The idea is to make free threads sleep for some amount of time and kill the thread if there is nothing to do after that period of time. Here is implementation

command_queue::handler_t handler;
while (handler = tasks.pop(timeout))
{
    handler();
}

here we exit the thread procedure if timeout occurred. That is fine, but there is problem with new thread creation. Let's say we already have 2 thread processing user requests, they are working at the moment, but we need to do some other operation asynchronously. We call

thread_pool::start(some_operation);

which should start new thread, because there is no free threads available. When thread is available it calls timed_wait on condition variable, so the idea is to check whether there are threads that are waiting.

if (thread_are_free_threads) // ???
   condition.notify_one();
else
   create_thread(thread_proc);

but how to check it? Documentation says, that if there are no waiting threads notify_one does nothing. If I could check whether or not it did nothing that would be a solution

if (!condition.notify_one()) // nobody was notified
   create_thread(thread_proc);

As far as I see there is no way to check that.

Thanks for your answers.

+2  A: 

You need to create another variable (perhaps a semaphore) which knows how many threads are running, then you can check that and create a new thread, if needed, before calling notify.

The other, better option is to just not have your threads exit when they time out. They should stay alive waiting to be notified. Instead of exiting when the notify times out, check a variable to see if the program is still running or if it is "shutting down", If it's still running, then start waiting again.

SoapBox
`boost::thread_group` can simplify the latter, i.e. you can call `interrupt_all()` to request the shutdown when they are waiting.
ybungalobill
The idea is to make a smart thread pool, which will not keep unnecessary threads alive. So this will not solve the problem, it will just avoid it by changing the task :) I'll try the semaphores, they are supposed to be a bit slower, but if there is no other choice.
ledokol
Keeping threads alive and waiting on a condition variable is not a problem. It consumes no runtime resources and the stack space is small. On the other hand creating the threads is expensive.
Martin York
That is why I want to wait for some time, before killing thread, but let's say we have a server application, that receives requests and handles them asynchronously, if we don't kill the threads in a peak moment we might end up by having dozen of threads that are doing absolutely nothing. That's not good from my point of view. I'm trying to make something similar to .NET thread pool.
ledokol
+1  A: 

A more typical thread pool would look like this:

Pool::Pool()
{
    runningThreads = 0;
    actualThreads  = 0;
    finished       = false;
    jobQue.Init();

    mutex.Init();
    conditionVariable.Init();

    for(int loop=0; loop < threadCount; ++loop) { startThread(threadroutine); }
}

Pool::threadroutine()
{

    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        ++ actualThreads;
        ++ runningThreads;
    }
    while(!finished)
    {
         Job job;
         {
             RAIILocker doLock(mutex);

             while(jobQue.empty())
             {
                 // This is the key.
                 // Here the thread is suspended (using zero resources)
                 // until some other thread calls the notify_one on the
                 // conditionVariable. At this point exactly one thread is release
                 // and it will start executing as soon as it re-acquires the lock
                 // on the mutex.
                 //
                 -- runningThreads;
                 conditionVariable.wait(mutex);
                 ++ runningThreads;
             }
             job = jobQue.getJobAndRemoveFromQue();
         }
         job.execute();
    }
    {
        // Extra code to count threads sp we can add more if required.
        RAIILocker doLock(mutex);
        -- actualThreads;
        -- runningThreads;
    }
}

Pool::AddJob(Job job)
{
    RAIILocker doLock(mutex);

    // This is where you would check to see if you need more threads.
    if (runningThreads == actualThreads) // Plus some other conditions.
    {
        // increment both counts. When it waits we decrease the running count.
        startThread(threadroutine);
    }
    jobQue.push_back(job);
    conditionVariable.notify_one();  // This releases one worker thread
                                     // from the call to wait() above.
                                     // Note: The worker thread will not start
                                     //       until this thread releases the mutex.
}
Martin York
I think this we already have in boost::asio?
ledokol
@ledokol: I am sure there is, but you seem to be trying to build your own. This is what it should look like.
Martin York
Yes, that's just for experimental purpose, but still boost::asio is not what I'm trying to build.
ledokol
@ledokol: The above is a relatively standard pattern for implementing thread pool. If you need something else then you need to have a better explanation of what you need (not what you are trying to implement).
Martin York
OK. Thread pool doesn't have any threads after initialization. When user requests for asynchronous operation thread pool creates a new thread and performs the operation. When the operation is finished thread is sleeping for some amount of time and waiting for new user request. After timeout thread is killed. So the thread pool is giving the task to free threads available, if no free thread is available it creates new thread which handles the request. User request should not stay in the queue while there are no free threads, it should be handled immediately.
ledokol
Later I might add some limitations like MaxThreadCount, MinThreadCount and by configuring this values I can get thread pool described by you.
ledokol
A: 

I think you need to rethink your design. In a simple model of a dealer thread handing out work the player threads, the dealer places the job onto the message queue and lets one of the players pick up the job when it gets a chance.

In your case the dealer is actively managing the thread pool in that it retains a knowledge on which player threads are idle and which are busy. Since the dealer knows which player is idle, the dealer can actively pass the idle the job and signal the player using a simple semaphore (or cond var) - there being one semaphore per player. In such a case, it might make sense for the dealer to destroy idle threads actively by giving the thread a kill myself job to do.

doron
My case is absolutely the same as you described. Maybe I was not enough clear about it. I'm just pushing the job into the queue and worker threads are waiting for new job to be posted and waking up as they see there is something to do. But to satisfy my needs dealer must create thread when there are no free threads available and the problem is that dealer doesn't know is there idle thread at the moment. The problem is to check this. You can imagine it as 2 type of threads - BOSS and WORKERS, BOSS gives tasks and workers are handling them, but when there is no free worker BOSS must hire someone
ledokol
The message queue pattern is BOSS posts jobs on notice board and WORKERS take a job when they have finished the previous one. What is important is that the BOSS does not know how busy the workers are. My pattern is BOSS assigns tasks to individual workers (who he knows by name) and hires another WORKER if he cannot find one that he knows to do the job.
doron
Maybe I don't understand you clearly. You wrote"Since the dealer knows which player is idle", but how does it know?
ledokol
"and hires another WORKER if he cannot find one that he knows to do the job" - yes, that's what I want, the problem is that I don't know how to find out if there are idle workers.
ledokol
You have a list of idle WORKERS. When a WORKER finishes its job it adds itself to the idle WORKERS list (atomically of course)
doron
hmmm...sounds good, but actually I don't need to know the concrete worker right? Because the worker take the job from queue itself. All what I need is just to know that there are free workers and my signal (notify_one) was not ignored. Simple volatile integer solves that as list of free workers does (see my answer), but there is still possible small overhead. The solution to that would be one more synchronization, to make thread creation and checking for free threads atomic, but wouldn't it be even worse in terms of performance and code complexity?
ledokol
A: 

Now I found one solution, but it's not as that perfect. I have volatile member variable named free - that stores number of free threads in the pool.

void thread_pool::thread_function()
{
    free++;
    command_queue::handler_t handler;
    while (handler = tasks.pop(timeout))
    {
        free--;
        handler();
        free++;
    }
    free--;
}

when I assign task to the thread I do something like this

if (free == 0)
    threads.create_thread(boost::bind(&thread_pool::thread_function, this));

there is still issue with synchronization, because if the context will be switched after free-- in thread_function we might create a new thread, which we actually don't need, but as the tasks queue is thread safe there is no problem with that, it's just an unwanted overhead. Can you suggest solution to that and what do you think about it? Maybe it's better to leave it as it is then having one more synchronization here?

ledokol
A: 

Another idea. You can query the length of the Message Queue. If it gets too long, create a new worker.

doron
It is possible, but this doesn't guarantee that all the task will be started immediately. In this case you might request to perform an operation, which will be performed after some others complete. This doesn't solve the problem. Dealer wants all tasks to be performed immediately, without any delay, even if it's not effective in terms of using more threads then processors/cores.
ledokol
Then take a look at my other answer
doron