views:

129

answers:

9

I am about to implement a worker thread with work item queuing, and while I was thinking about the problem, I wanted to know if I'm doing the best thing.

The thread in question will have to have some thread local data (preinitialized at construction) and will loop on work items until some condition will be met.

pseudocode:

volatile bool run = true;

int WorkerThread(param)
{
    localclassinstance c1 = new c1();
    [other initialization]

    while(true) {
        [LOCK]
        [unqueue work item]
        [UNLOCK]
        if([hasWorkItem]) {
            [process data]
            [PostMessage with pointer to data]
        }
        [Sleep]

        if(!run)
            break;
    }

    [uninitialize]
    return 0;
}

I guess I will do the locking via critical section, as the queue will be std::vector or std::queue, but maybe there is a better way.

The part with Sleep doesn't look too great, as there will be a lot of extra Sleep with big Sleep values, or lot's of extra locking when Sleep value is small, and that's definitely unnecessary.

But I can't think of a WaitForSingleObject friendly primitive I could use instead of critical section, as there might be two threads queuing work items at the same time. So Event, which seems to be the best candidate, can loose the second work item if the Event was set already, and it doesn't guarantee a mutual exclusion.

Maybe there is even a better approach with InterlockedExchange kind of functions that leads to even less serialization.

P.S.: I might need to preprocess the whole queue and drop the obsolete work items during the unqueuing stage.

+1  A: 

There are various ways to do this

For one you could create an event instead called 'run' and then use that to detect when thread should terminate, the main thread then signals. Instead of sleep you would then use WaitForSingleObject with a timeout, that way you will quit directly instead of waiting for sleep ms.

Another way is to accept messages in your loop and then invent a user defined message that you post to the thread

EDIT: depending on situation it may also be wise to have yet another thread that monitors this thread to check if it is dead or not, this can be done by the above mentioned message queue so replying to a certain message within x ms would mean that the thread hasn't locked up.

Anders K.
A: 

Use a semaphore instead of an event.

DanDan
in this case I think an event would suffice because its boolean in its nature.
Anders K.
+1  A: 

I'd restructure a bit:

WorkItem GetWorkItem()
{
    while(true)
    {
        WaitForSingleObject(queue.Ready);
        {
            ScopeLock lock(queue.Lock);
            if(!queue.IsEmpty())
            {
                return queue.GetItem();
            }
        }
    }
}

int WorkerThread(param) 
{ 
    bool done = false;
    do
    {
        WorkItem work  = GetWorkItem();
        if( work.IsQuitMessage() )
        {
            done = true;
        }
        else
        {
            work.Process();
        }
    } while(!done);

    return 0; 
} 

Points of interest:

  1. ScopeLock is a RAII class to make critical section usage safer.
  2. Block on event until workitem is (possibly) ready - then lock while trying to dequeue it.
  3. don't use a global "IsDone" flag, enqueue special quitmessage WorkItems.
snemarch
>WaitForSingleObject(queue.Ready); Is a wait for Event? What if thread 1 schedules an item, signals the event, at the same time thread 2 schedules and item and also signals the event. Worker thread resumes, processes one item, resets the event and goes into WaitForSingleObject again, with one item left dangling. The RAII and quitmessage is a nice idea.
Madman
If memory serves me right, the following should work: use manual event reset, and reset in dequeue if workitems drops to zero.
snemarch
+4  A: 

There are a multitude of ways to do this.

One option is to use a semaphore for the waiting. The semaphore is signalled every time a value is pushed on the queue, so the worker thread will only block if there are no items in the queue. This will still require separate synchronization on the queue itself.

A second option is to use a manual-reset event which is set when there are items in the queue and cleared when the queue is empty. Again, you will need to do separate synchronization on the queue.

A third option is to have an invisible message-only window created on the thread, and use a special WM_USER or WM_APP message to post items to the queue, attaching the item to the message via a pointer.

Another option is to use condition variables. The native Windows condition variables only work if you're targetting Windows Vista or Windows 7, but condition variables are also available for Windows XP with Boost or an implementation of the C++0x thread library. An example queue using boost condition variables is available on my blog: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

Anthony Williams
The Event solution seems to be the best, unfortunately, it seems I need to do more studying and refactoring because of the COM. CoInitialize seems to be incompatible with WaitForSingleObject. Conditional variables look nice, but I have no Boost dependency in this project and I have to support WinXp
Madman
IIRC, COM requires a message pump, so use `MsgWaitForMultipleObjects` instead, and pump windows messages if you get any.
Anthony Williams
if COM is a requirement, then you might as well just post thread messages, or use a MESSAGE_HWND type window as the destination as the windows message pump mechanism has already been instantiated on the thread.
Chris Becke
Yes. Message-only windows (with a parent of `HWND_MESSAGE`) are much better than thread messages, as they don't get lost unless the queue overflows, whereas thread messages can get lost if messages are handled anywhere except the top-level message loop.
Anthony Williams
A: 

Keep the signaling and synchronizing separate. Something along these lines...

// in main thread

HANDLE events[2];
events[0] = CreateEvent(...); // for shutdown
events[1] = CreateEvent(...); // for work to do

// start thread and pass the events

// in worker thread

DWORD ret;
while (true)
{
   ret = WaitForMultipleObjects(2, events, FALSE, <timeout val or INFINITE>);

   if shutdown
      return
   else if do-work
      enter crit sec
      unqueue work
      leave crit sec
      etc.
   else if timeout
      do something else that has to be done
}
dgnorton
+2  A: 

The fastest locking primitive is usually a spin-lock or spin-sleep-lock. CRITICAL_SECTION is just such a (user-space) spin-sleep-lock. (Well, aside from not using locking primitives at all of course. But that means using lock-free data-structures, and those are really really hard to get right.)

As for avoiding the Sleep: have a look at condition-variables. They're designed to be used together with a "mutex", and I think they're much easier to use correctly than Windows' EVENTs.

Boost.Thread has a nice portable implementation of both, fast user-space spin-sleep-locks and condition variables:

http://www.boost.org/doc/libs/1_44_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref

A work-queue using Boost.Thread could look something like this:

template <class T>
class Queue : private boost::noncopyable
{
public:
    void Enqueue(T const& t)
    {
        unique_lock lock(m_mutex);

        // wait until the queue is not full
        while (m_backingStore.size() >= m_maxSize)
            m_queueNotFullCondition.wait(lock); // releases the lock temporarily

        m_backingStore.push_back(t);
        m_queueNotEmptyCondition.notify_all(); // notify waiters that the queue is not empty
    }

    T DequeueOrBlock()
    {
        unique_lock lock(m_mutex);

        // wait until the queue is not empty
        while (m_backingStore.empty())
            m_queueNotEmptyCondition.wait(lock); // releases the lock temporarily

        T t = m_backingStore.front();
        m_backingStore.pop_front();

        m_queueNotFullCondition.notify_all(); // notify waiters that the queue is not full

        return t;
    }

private:
    typedef boost::recursive_mutex mutex;
    typedef boost::unique_lock<boost::recursive_mutex> unique_lock;

    size_t const m_maxSize;

    mutex mutable m_mutex;
    boost::condition_variable_any m_queueNotEmptyCondition;
    boost::condition_variable_any m_queueNotFullCondition;

    std::deque<T> m_backingStore;
};
pgroke
Do you know of a tutorial explaining condition variables? Ive yet to see a condition variable example demonstrating how they are better than a windows semaphore or event object. Oh sure there is an associated critical section object, but how to utilize this to achieve some is entirely un obvious to the uninitiated.
Chris Becke
+1  A: 

You can have a look at another approach here that uses C++0x atomic operations

http://www.drdobbs.com/high-performance-computing/210604448

David
Interesting link.
Madman
A: 

Given that this question is tagged windows, Ill answer thus:

Don't create 1 worker thread. Your worker thread jobs are presumably independent, so you can process multiple jobs at once? If so:

  • In your main thread call CreateIOCompletionPort to create an io completion port object.
  • Create a pool of worker threads. The number you need to create depends on how many jobs you might want to service in parallel. Some multiple of the number of CPU cores is a good start.
  • Each time a job comes in call PostQueuedCompletionStatus() passing a pointer to the job struct as the lpOverlapped struct.
  • Each worker thread calls GetQueuedCompletionItem() - retrieves the work item from the lpOverlapped pointer and does the job before returning to GetQueuedCompletionStatus.

This looks heavy, but io completion ports are implemented in kernel mode and represent a queue that can be deserialized into any of the worker threads associated with the queue (i.e. waiting on a call to GetQueuedCompletionStatus). The io completion port knows how many of the threads that are processing an item are actually using a CPU vs blocked on an IO call - and will release more worker threads from the pool to ensure that the concurrency count is met.

So, its not lightweight, but it is very very efficient... io completion port can be associated with pipe and socket handles for example and can dequeue the results of asynchronous operations on those handles. io completion port designs can scale to handling 10's of thousands of socket connects on a single server - but on the desktop side of the world make a very convenient way of scaling processing of jobs over the 2 or 4 cores now common in desktop PCs.

Chris Becke
The biggest problem of io completion port based designs written in modern c++ is that all threads in a process share access to a single heap, and the STL tends to thrash the heap a lot. As a result you can find performance limited by each of the worker threads competing for heap locks more than anything else.
Chris Becke
Unfortunately, I have to use COM components, and ThreadPools are not very friendly with them.
Madman
without knowing how you are using COM objects makes that difficult to judge. If the COM objects form part of the job - yes - problem. If the com objects are used to do the job, then each worker thread is its own apartment, you just need a TLS store to get the worker objects relavant to the current thread.
Chris Becke
+1  A: 

It is possible to share a resource between threads without using blocking locks at all, if your scenario meets certain requirements.

You need an atomic pointer exchange primitive, such as Win32's InterlockedExchange. Most processor architectures provide some sort of atomic swap, and it's usually much less expensive than acquiring a formal lock.

You can store your queue of work items in a pointer variable that is accessible to all the threads that will be interested in it. (global var, or field of an object that all the threads have access to)

This scenario assumes that the threads involved always have something to do, and only occasionally "glance" at the shared resource. If you want a design where threads block waiting for input, use a traditional blocking event object.

Before anything begins, create your queue or work item list object and assign it to the shared pointer variable.

Now, when producers want to push something onto the queue, they "acquire" exclusive access to the queue object by swapping a null into the shared pointer variable using InterlockedExchange. If the result of the swap returns a null, then somebody else is currently modifying the queue object. Sleep(0) to release the rest of your thread's time slice, then loop to retry the swap until it returns non-null. Even if you end up looping a few times, this is many. many times faster than making a kernel call to acquire a mutex object. Kernel calls require hundreds of clock cycles to transition into kernel mode.

When you successfully obtain the pointer, make your modifications to the queue, then swap the queue pointer back into the shared pointer.

When consuming items from the queue, you do the same thing: swap a null into the shared pointer and loop until you get a non-null result, operate on the object in the local var, then swap it back into the shared pointer var.

This technique is a combination of atomic swap and brief spin loops. It works well in scenarios where the threads involved are not blocked and collisions are rare. Most of the time the swap will give you exclusive access to the shared object on the first try, and as long as the length of time the queue object is held exclusively by any thread is very short then no thread should have to loop more than a few times before the queue object becomes available again.

If you expect a lot of contention between threads in your scenario, or you want a design where threads spend most of their time blocked waiting for work to arrive, you may be better served by a formal mutex synchronization object.

dthorpe