views:

535

answers:

3

I am implementing a C++ message queue based on a std::queue.

As I need popers to wait on an empty queue I was considering using mutex for mutual exclusion and cond for suspending threads on empty queue, as glib does with the gasyncqueue.

However it looks to me that a mutex&semaphore would do the job, I think it contains an integer and that seems like a pretty high number to reach on pending messages.

Pros of semaphore are that you don't need to check manually the condition each time you return from a wait, as you now for sure that someone inserted something(when someone inserted 2 items and you are the second thread arriving).

Which one would you choose?

EDIT: Changed the question in response to @Greg Rogers

A: 

If You want to allow multiple simultaneously users at a time to use your queue, you should use semaphores.

sema(10) // ten threads/process have the concurrent access.

sema_lock(&sema_obj)
queue
sema_unlock(&sema_obj)

Mutex will "authorize" only one user at a time.

pthread_mutex_lock(&mutex_obj)
global_data;
pthread_mutex_unlock(&mutex_obj)

That's the main difference and You should decide which solution will fit your requirements. But I'd choose mutex approach, because You don't need to specifies how many users can grab your resource.

bua
Arkaitz Jimenez
A: 

Personally I use a mutex to serialize access to the list, and wake up the consumer by sending a byte over a socket (produced by socketpair()). That may be somewhat less efficient than a semaphore or condition variable, but it has the advantage of allowing the consumer to block in select()/poll(). That way the consumer can also wait on other things besides the data queue, if it wants to. It also lets you use the exact same queueing code on almost all OS's, since practically every OS supports the BSD sockets API.

Psuedocode follows:

// Called by the producer.  Adds a data item to the queue, and sends a byte
// on the socket to notify the consumer, if necessary
void PushToQueue(const DataItem & di)
{
   mutex.Lock();
   bool sendSignal = (queue.size() == 0);
   queue.push_back(di);
   mutex.Unlock();
   if (sendSignal) producerSocket.SendAByteNonBlocking();
}

// Called by consumer after consumerSocket selects as ready-for-read
// Returns true if (di) was written to, or false if there wasn't anything to read after all
// Consumer should call this in a loop until it returns false, and then
// go back to sleep inside select() to wait for further data from the producer
bool PopFromQueue(DataItem & di)
{
   consumerSocket.ReadAsManyBytesAsPossibleWithoutBlockingAndThrowThemAway();
   mutex.Lock();
   bool ret = (queue.size() > 0);
   if (ret) queue.pop_front(di);
   mutex.Unlock();
   return ret;
}
Jeremy Friesner
+2  A: 

A single semaphore does not do the job - you need to be comparing (mutex + semaphore) and (mutex + condition variable).

It is pretty easy to see this by trying to implement it:

void push(T t)
{
    queue.push(t); 
    sem.post();
}

T pop()
{
    sem.wait();
    T t = queue.top();
    queue.pop();
    return t;
}

As you can see there is no mutual exclusion when you are actually reading/writing to the queue, even though the signalling (from the semaphore) is there. Multiple threads can call push at the same time and break the queue, or multiple threads could call pop at the same time and break it. Or, a thread could call pop and be removing the first element of the queue while another thread called push.

You should use whichever you think is easier to implement, I doubt performance will vary much if any (it might be interesting to measure though).

Greg Rogers
Hmm, you are right, I didn't ask it properly, I DO use a mutex for data accessing, apart from the semaphore.
Arkaitz Jimenez
Look at the source of http://code.google.com/p/litm/ in the queue.c file.
jldupont