views:

909

answers:

5

I'm trying to write a thread-safe queue using pthreads in c++. My program works 93% of the time. The other 7% of the time it other spits out garbage, OR seems to fall asleep. I'm wondering if there is some flaw in my queue where a context-switch would break it?

// thread-safe queue
// inspired by http://msmvps.com/blogs/vandooren/archive/2007/01/05/creating-a-thread-safe-producer-consumer-queue-in-c-without-using-locks.aspx
// only works with one producer and one consumer
#include <pthread.h>
#include <exception>

template<class T>
class tsqueue
{
    private:
        volatile int m_ReadIndex, m_WriteIndex;
        volatile T *m_Data;
        volatile bool m_Done;
        const int m_Size;
        pthread_mutex_t m_ReadMutex, m_WriteMutex;
        pthread_cond_t m_ReadCond, m_WriteCond;
    public:
        tsqueue(const int &size);
        ~tsqueue();
        void push(const T &elem);
        T pop();
        void terminate();
        bool isDone() const;
};

template <class T>
tsqueue<T>::tsqueue(const int &size) : m_ReadIndex(0), m_WriteIndex(0), m_Size(size), m_Done(false) {
    m_Data = new T[size];
    pthread_mutex_init(&m_ReadMutex, NULL);
    pthread_mutex_init(&m_WriteMutex, NULL);
    pthread_cond_init(&m_WriteCond, NULL);
    pthread_cond_init(&m_WriteCond, NULL);
}

template <class T>
tsqueue<T>::~tsqueue() {
    delete[] m_Data;
    pthread_mutex_destroy(&m_ReadMutex);
    pthread_mutex_destroy(&m_WriteMutex);
    pthread_cond_destroy(&m_ReadCond);
    pthread_cond_destroy(&m_WriteCond);
}


template <class T>
void tsqueue<T>::push(const T &elem) {
    int next = (m_WriteIndex + 1) % m_Size;
    if(next == m_ReadIndex) {
        pthread_mutex_lock(&m_WriteMutex);
        pthread_cond_wait(&m_WriteCond, &m_WriteMutex);
        pthread_mutex_unlock(&m_WriteMutex);
    }
    m_Data[m_WriteIndex] = elem;
    m_WriteIndex = next;
    pthread_cond_signal(&m_ReadCond);
}

template <class T>
T tsqueue<T>::pop() {
    if(m_ReadIndex == m_WriteIndex) {
        pthread_mutex_lock(&m_ReadMutex);
        pthread_cond_wait(&m_ReadCond, &m_ReadMutex);
        pthread_mutex_unlock(&m_ReadMutex);
        if(m_Done && m_ReadIndex == m_WriteIndex) throw "queue empty and terminated";
    }
    int next = (m_ReadIndex +1) % m_Size;
    T elem = m_Data[m_ReadIndex];
    m_ReadIndex = next;
    pthread_cond_signal(&m_WriteCond);
    return elem;
}

template <class T>
void tsqueue<T>::terminate() {
    m_Done = true;
    pthread_cond_signal(&m_ReadCond);
}

template <class T>
bool tsqueue<T>::isDone() const {
    return (m_Done && m_ReadIndex == m_WriteIndex);
}

This could be used like this:

// thread 1
while(cin.get(c)) {
    queue1.push(c);
}
queue1.terminate();


// thread 2
while(!queue1.isDone()) {
    try{ c = queue1.pop(); }
    catch(char const* str){break;}
    cout.put(c);
}

If anyone sees a problem with this, please say so :)

+3  A: 

If this is your actual code, one problem right off the bat is that you're initializing m_WriteCond twice, and not initializing m_ReadCond at all.

Michael Burr
Oh wow, completely missed that. I'm surprised this thing didn't blow up. Thanks :)
Mark
+7  A: 

Yes, there are definitely problems here. All your accesses to queue member variables occur outside the mutexes. In fact, I'm not entirely sure what your mutexes are protecting, since they are just around a wait on a condition variable.

Also, it appears that your reader and writer will always operate in lock-step, never allowing the queue to grow beyond one element in size.

Greg Hewgill
Only reason I have the mutexes at all is because the pthread_cond_wait requires them. Since I only ever have one reader or writer at a time, no variable should be read or written to concurrently (with or without mutexes).
Mark
Oh, and I'm not sure what you mean by they will operate in lock-step. Why would they? Wouldn't that only happen if thread 1 and thread 2 ran at the exact same speed and were on separate CPUs? Otherwise thread 1 could run several times before thread 2 is ever called.
Mark
A: 

Seems that the problem is that you have a race condition that thread 2 CAN run before thread 1 ever does any cin.get(c). Need to make sure that the data is initialized and when you're getting information that you are ensuring that you are doing something if the data has not been entered.

Maybe this is me not seeing the rest of the code where this is done though.

Suroot
I think you're missing something. if `pop()` is called first, then `m_ReadIndex == m_WriteIndex` and thread 2 will go to sleep until it's signaled.
Mark
+2  A: 

You should treat this class as a monitor. You should have a "monitor lock" for each queue (a normal mutex). Whenever you enter a method that reads or writes any field in the queue, you should lock this mutex as soon as you enter it. This prevents more than one thread from interacting with the queue at a time. You should release the lock before you wait on a condition and when you leave a method so other threads may enter. Make sure to re-acquire the lock when you are done waiting on a condition.

Jay Conrod
Seems a bit inefficient to lock down the whole queue, no? No reason it shouldn't be able to read and write at the same time, as long as it isn't to the same element. I appreciate the suggestion though :)
Mark
You need to protect the whole shared state. Both readers and writers access both indices, so you actually do need to lock down the whole queue. With two threads it's not much of a problem. If you have lots of readers and writers, look up the readers-writers problem for a more efficient approach.
Jay Conrod
A: 

If you want anything with decent performance I would strongly suggest dumping your R/W lock and just use a very simple spinlock. Or if you really think you can get the performance you want with R/W lock, i would roll your own based on this design(single word R/W Spinlock) from Joe Duffy.

Matt Davison
Ah, thank you. I'll give this a read! I'm still new to multi-threading.
Mark