views:

1301

answers:

10

I need to build a system of workers (represented as threads) and (multiple) queues. Individual jobs are waiting in one of the queues and waits for a worker thread to process them. Each worker can process jobs only from some of the queues. No spin-waiting. C/C++, pthreads, standard POSIX.

The problem for me is the "multiple queues" thing. I know how to implement this with a single queue. The workers need to wait on all the queues that they can process (wait for ANY of them).

On Windows I would use WaitForMultipleObjects, but this needs to be multi-platform.

I don't want any particular code for this, only a hint or a description of the model which I should use. Thanks in advance.

+1  A: 

If there aren't too many workers for each queue, you could make a condition variable for each worker.

Dave
A: 

Sounds like you should make use of boost::thread, boost::condition and an std::queue.

Brian R. Bondy
+3  A: 

What you can do is use a condition variable. Have your worker threads wait on a condition variable. When a job gets added to any of the job queues, signal the condition variable. Then, when the worker thread wakes up, it checks the queues it's waiting on. If any of them have a job, it takes that job off the queue. Otherwise, it goes back to waiting on the condition variable. Waiting on a condition variable puts the thread to sleep, so it does not consume CPU time.

Of course, it goes without saying that you should protect all accesses to the job queues with a mutex (e.g. pthread_mutex_t).

Adam Rosenfield
Good idea, but this will wake up all worker threads when adding a job to an empty queue. I suppose I will do it this way anyway, but is there a way to wake up only the "correct" thread (and ideally only one of all the correct threads)?
pthread_cond_signal() will wake up "at least one" thread waiting on the condition variable, whereas pthread_cond_broadcast() will wake up all threads. In theory, pthread_cond_signal() should only wake up one thread most of the time, but I don't know if that's true or not in practice.
Adam Rosenfield
But in your solution I *need* to wake up all the threads, because a job might be in a queue that the *one* worker doesn't process.
A: 

What I would do is use boost::asio to queue up data so that your multiple threads can have a go at it. You can pass a ref to the queue via the post command and have the thread process accordingly.

PiNoYBoY82
A: 

Instead of having a separate lock for each queue, why not have one lock for all queue?

  1. Wait on the lock
  2. Get the lock
  3. Dequeue from whichever queue
  4. Release the lock
  5. Process the dequeued item
  6. Goto step 1

Assuming that the dequeue takes negligible time (so the lock is only ever held for negligible time) there may be no need for more than one lock.

ChrisW
I don't think that will work. A worker only processes jobs from a specific queue. If there is a only job in a "bad" queue, the worker will be busy-waiting until other worker takes the job.
A: 

Consider using of http://www.codeproject.com/KB/threads/lwsync.aspx.

Mykola Golubyev
A: 

You could do something like this: each job has a "queue" associated with it. Eg:

Say you have 2 queues. Your jobs could say:

job[0].queue = 1; /* That job is in queue 1 */
job[1].queue = 1;
job[2].queue = 2; /* this job is in queue 2 */
... 
etc

So then you have your "bag of threads". A thread simply picks a job - say, job[2]. If that thread is only allowed to process jobs from queue 1, then it puts that job back into the ready queue and chooses a different job.

So each thread knows which queues it is allowed to process, and when it chooses a job, it makes sure that job's "queue" field matches. If it doesn't, it chooses a different job.

(this is in many ways how process scheduling works in linux on multiple cores. Each process has a bitmask saying which processors it is allowed to run on, and then the processor makes sure it is "allowed" to run that process before doing so.)

rascher
+1  A: 

If you would like to write lock-free code, then this is a must-read: Lock-Free Code: A False Sense of Security

+4  A: 

How about:

  • all worker threads wait on a semaphore
  • when anything is added to the queue, the semaphore is incremented, which wakes a single thread
  • the thread checks the queues it is interested in, processes one of them and goes back to waiting on the semaphore

You will need additional mutex(es) to control actual read & writes to the queues.

anon
+1 I'll go with Neil on this one :)
Magnus Skog
A: 

Here's an article I found useful that implements Brian's idea of using boost::thread, boost::condition, and a std::queue: (I don't have enough points to add comments yet, otherwise I would have added this to Brian's answer)

Implementing a Thread-Safe Queue using Condition Variables

Mark