views:

195

answers:

3

I have a very simple queue implementation that wraps a fixed array. It contains peek, enqueue, and dequeue. If peek returns a reference, I've found that it will return conflicting results eventually (conflicting results meaning it will return 2 different values without any intervening dequeues or enqueues). Obviously, this could happen if that reference is held onto and modified, but as far as I can tell, it isn't. In fact, calling peek again gives the expected result.

Below is the code with Windows threading and mutexes. I've also tried this using pthreads on Linux with the same result. I obviously don't understand something... I've dumped the executable and find the only difference between returning a reference or a value is when the memory location is dereferenced. For example:

If a reference is returned, peek contains:
lea eax,[edx+ecx*4+8]
And then in the consumer thread:
cmp dword ptr [eax],1

But, if a value is returned, peek contains:
mov eax,dword ptr [edx+ecx*4+8]
And then in the consumer thread:
cmp eax,1

Thanks!

#include <iostream>
#include <windows.h>

typedef void *(thread_func_type)(void *);

void start_thread(HANDLE &thread, thread_func_type *thread_func, void *arg)
{
    DWORD id;
    thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)thread_func, arg, 0, &id);
    if (thread == NULL) {
        std::cerr << "ERROR: failed to create thread\n";
        ::exit(1);
    }
}

void join_thread(HANDLE &thread)
{
    WaitForSingleObject(thread, INFINITE);
}

class ScopedMutex
{
    HANDLE &mutex;

public:

    ScopedMutex(HANDLE &mutex_) : mutex(mutex_)
    {
        WORD result = WaitForSingleObject(mutex, INFINITE);
        if (result != WAIT_OBJECT_0) {
            std::cerr << "ERROR: failed to lock mutex\n";
            ::exit(1);
        }
    };

    ~ScopedMutex()
    {
        ReleaseMutex(mutex);
    };
};

template <typename T, unsigned depth>
class Queue
{
    unsigned head, tail;
    bool full;
    T data[depth];
    HANDLE mutex;

public:

    Queue() : head(0), tail(0), full(false)
    {
        mutex = CreateMutex(NULL, 0, NULL);
        if (mutex == NULL) {
            std::cerr << "ERROR: could not create mutex.\n";
            ::exit(1);
        }
    };

    T &peek()
    {
        while (true) {
            {
                ScopedMutex local_lock(mutex);
                if (full || (head != tail))
                    return data[tail];
            }
            Sleep(0);
        }
    };

    void enqueue(const T &t)
    {
        while (true) {
            {
                ScopedMutex local_lock(mutex);
                if (!full) {
                    data[head++] = t;
                    head %= depth;
                    full = (head == tail);
                    return;
                }
            }
            Sleep(0);
        }
    };

    void dequeue()
    {
        while (true) {
            {
                ScopedMutex local_lock(mutex);
                if (full || (head != tail)) {
                    ++tail;
                    tail %= depth;
                    full = false;
                    return;
                }
            }
            Sleep(0);
        }
    };
};

template <unsigned num_vals, int val, unsigned depth>
void *
producer(void *arg)
{
    Queue<int, depth> &queue = *static_cast<Queue<int, depth> *>(arg);
    for (unsigned i = 0; i < num_vals; ++i) {
        queue.enqueue(val);
    }
    std::cerr << "producer " << val << " exiting.\n";
    return NULL;
}

template <unsigned num_vals, int val, unsigned depth>
void *
consumer(void *arg)
{
    Queue<int, depth> &queue = *static_cast<Queue<int, depth> *>(arg);
    for (unsigned i = 0; i < num_vals; ++i) {
        while (queue.peek() != val)
            Sleep(0);
        if (queue.peek() != val) {
            std::cerr << "ERROR: (" << val << ", " << queue.peek() << ")" << std::endl;
            std::cerr << "But peeking again gives the right value " << queue.peek() << std::endl;
            ::exit(1);
        }
        queue.dequeue();
    }
    return NULL;
}

int
main(int argc, char *argv[])
{
    const unsigned depth = 10;
    const unsigned num_vals = 100000;
    Queue<int, depth> queue;
    HANDLE p1, p2, c1, c2;
    start_thread(p1, producer<num_vals, 1, depth>, &queue);
    start_thread(p2, producer<num_vals, 2, depth>, &queue);
    start_thread(c1, consumer<num_vals, 1, depth>, &queue);
    start_thread(c2, consumer<num_vals, 2, depth>, &queue);
    join_thread(p1);
    join_thread(p2);
    join_thread(c1);
    join_thread(c2);
}
+3  A: 

Peek returns a reference into the middle of array, while other threads are actively modifying that memory. Reading any property from that reference will be undefined behavior. You can't peek inside, your dequeue should remove the element and return a copy.

Remus Rusanu
In the example above no threads store or use the peek'd reference. Peek is only compared to a constant. I understand returning a reference is dangerous with multiple threads, but in the above code nothing uses the returned reference and peek still returns different values. Where am I missing where multiple threads actively modify the same memory location?
voxmea
"Peek is only compared to a constant": there is your stored reference. That comparison occurs after you returned from the function, therefore *after you exited the lock scope*. So in effect you are looking into the array outside lock boundaries, hence you undefined behavior. 'Reference' is really nothing but a fancy name for 'pointer'. You have a pointer inside the array, pointing to memory being changed. You read the pointer, you'll get random values.
Remus Rusanu
Thanks Remus! If I understand: even though only 1 thread should be outside of the consumer while loop at a time (while peek != val), the peek dereference is happening outside of the lock, therefore the data read is undefined?
voxmea
No thread should write *nor* read from inside the array outside the lock scope. By returning a reference, you return a pointer *inside* the array. But you already released the lock, so anyone using the reference will break the locking contract and get unpredictable results.
Remus Rusanu
This applies not only to the comparison, but every other use of peek(), including the ostream writes. Say a consumer calls peek() and gets a reference to the 'slot' 1 in the array, which happens to have the value '1'. A consumer dequeues, the value is still '1', but the slot is free. P1 enqueues '2', the peek returned reference would be now '2' read. A consumer dequeues again, P1 enqueues '3'. A read on the peek reference would now read 3. A consumer dequeues, and P2 enqueues '2'. If read again the peek reference changed to '2'.
Remus Rusanu
By returning a reference you allow a reader to see all the transient states, completely unprotected. when T is an int, at least you'll see atomic, consistent values. But for more complex values you can see half of the value from one operation, half from another, as you may catch a non-atomic write right in the middle.
Remus Rusanu
A: 

Your threads might be taking turns, round-robin, which would make your consumers happen to consume the right number every time.

Instead, have one consumer grab the reference to a number and compare it against a copy like:

int& valRef(queue.peek());
int valCopy = valRef;
while( valRef == valCopy){}
printf("Aha! It IS unsafe!\n");

Eventually, one of the producers will overwrite the memory you reference while you're doing the comparison.

thebretness
+2  A: 

Maybe it played out like this:

The queue is totally full. Because of some freak of scheduling Producer #2 ran twice in a row so the next two slots in the queue contain these values: 2, 2

  1. Consumer thread #1 is in its spin loop and has just called peek(). It returns a reference to the first slot. But during the time between the lea instruction and the cmp instruction:
  2. Consumer thread #2 calls dequeue(). This frees up the slot that Consumer #1's peek() just returned, allowing a producer thread to proceed. The second value 2 is now at the head of the queue.
  3. Producer thread #1 now overwrites the first slot with the value 1. Because the queue is circular, that first slot is now the tail of the queue.

    Those two slots now contain these values: 1, 2

  4. Back in Consumer thread #1, the cmp instruction happens, you see the desired value and exit the while loop

  5. Consumer #1 calls peek() again, and sees the wrong value. 


When peek() returned a copy you didn't see this race condition because Consumer #1 is holding the Mutex when it retrieves the value. When peek() returned a reference you are retrieving the value without holding the Mutex and are therefore at the mercy of the CPU's instruction scheduler and the OS's thread scheduler.
 

Brian Sandlin
Thanks Brian! Duh, I see now that the state of the queue can change between the time that peek returns an address and the consumer thread actually loads the memory location. The address returned from peek may not be valid (may not be the tail location) by the time the consumer thread uses it.
voxmea