views:

91

answers:

1

Hi,

I have a program with several worker threads, and a main thread that receives jobs. In the main thread I want to queue the jobs onto a synchronized queue, and have the worker threads sitting there all waiting on the queue. When there's something in the queue I want the worker to pull the job from the queue, while the remaining work sits there waiting for another job.

I found CreateMsgQueue (http://msdn.microsoft.com/en-us/library/ms885180.aspx) however this appears to only exist for Windows CE.

I know I could write this myself, however if something already existed I'd be a fool not to use it.

I am developing in c++ using Visual Studio 2005.

Any suggestions gratefully received.

Thanks Rich

+1  A: 

Windows doesn't provide exactly what you want. What it does provide is thread pools -- with these, you not only don't have to create the queue yourself, but you don't have to create or (directly) manage the threads either.

Of course, synchronized queues do exist too, just not as part of Windows. One I wrote looks like this:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif
Jerry Coffin
Thanks Jerry, really nice solution. I think I like your queue more than thread pooling.
Rich
You can also achieve pretty much the same with completion ports. I wrote an example showing how to do it several years ago: http://blogs.msdn.com/b/larryosterman/archive/2004/03/29/101329.aspx
Larry Osterman
Im with Larry on this one. Don't re-invent the wheel: Completion ports implement a queues job dispatching mechanism already.
Chris Becke
I'm with Chris and Larry. Use completion ports.
Len Holgate
@Larry: If you can, you might want to go back and correct a fundamental problem with that blog entry. You start off by talking about the boxcar problem with the event in the manually written queue. In fact, this problem will only arise if you *make* it happen by creating a manual reset even instead of an auto reset event. With an auto reset event, exactly *one* waiting thread will be awakened each time the event is set, so the problem never arises.
Jerry Coffin
@Chris, Len: There's nothing wrong with re-inventing the wheel, especially when the one you're replacing is rectangular.
Jerry Coffin
@Larry: Sorry, I meant to add: I know you mention auto-reset events, but you start with an assumption of a manual-reset event, which makes no sense in this situation (a manual-reset event is specifically for situations where you really *want* to wake up all waiting threads). Using an auto-reset event for this job isn't a band-aid -- it's just using the right tool for the job.
Jerry Coffin
@Jerry: If you use an auto-reset event, you have different scalability problem - if there are a large number of elements inserted in the queue at the same time, you only wake up one thread which means you spend excess time in the kernel waking up threads (this is largely fixed in Win7 with the removal of the dispatcher spin lock).
Larry Osterman
@Larry: It is true that an auto-reset event isn't exactly ideal. As you can see in the code above, the *right* choice is to use a counted semaphore, so that inserting N items wakes (up to) N threads. The bottom line is that it's a straw man -- a design that nobody would ever really try in the first place because it's obviously wrong -- but still used as an excuse to dismiss correct designs along with the bad ones.
Jerry Coffin
Jerry, IMHO IOCP are generally better for this kind of thing due to how they limit the number of threads that run and keep the same threads running and so prevent unnecessary context switches where possible. You simply can't achieve the same in user mode code and so they'll always have an inherent advantage that will make them a better choice than hand rolled code; simply wrap them in to make them conform to your required interface.
Len Holgate