I have a very simple queue implementation that wraps a fixed array. It contains peek, enqueue, and dequeue. If peek returns a reference, I've found that it will return conflicting results eventually (conflicting results meaning it will return 2 different values without any intervening dequeues or enqueues). Obviously, this could happen if that reference is held onto and modified, but as far as I can tell, it isn't. In fact, calling peek again gives the expected result.
Below is the code with Windows threading and mutexes. I've also tried this using pthreads on Linux with the same result. I obviously don't understand something... I've dumped the executable and find the only difference between returning a reference or a value is when the memory location is dereferenced. For example:
If a reference is returned, peek contains:
lea eax,[edx+ecx*4+8]
And then in the consumer thread:
cmp dword ptr [eax],1
But, if a value is returned, peek contains:
mov eax,dword ptr [edx+ecx*4+8]
And then in the consumer thread:
cmp eax,1
Thanks!
#include <iostream>
#include <windows.h>
typedef void *(thread_func_type)(void *);
void start_thread(HANDLE &thread, thread_func_type *thread_func, void *arg)
{
DWORD id;
thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)thread_func, arg, 0, &id);
if (thread == NULL) {
std::cerr << "ERROR: failed to create thread\n";
::exit(1);
}
}
void join_thread(HANDLE &thread)
{
WaitForSingleObject(thread, INFINITE);
}
class ScopedMutex
{
HANDLE &mutex;
public:
ScopedMutex(HANDLE &mutex_) : mutex(mutex_)
{
WORD result = WaitForSingleObject(mutex, INFINITE);
if (result != WAIT_OBJECT_0) {
std::cerr << "ERROR: failed to lock mutex\n";
::exit(1);
}
};
~ScopedMutex()
{
ReleaseMutex(mutex);
};
};
template <typename T, unsigned depth>
class Queue
{
unsigned head, tail;
bool full;
T data[depth];
HANDLE mutex;
public:
Queue() : head(0), tail(0), full(false)
{
mutex = CreateMutex(NULL, 0, NULL);
if (mutex == NULL) {
std::cerr << "ERROR: could not create mutex.\n";
::exit(1);
}
};
T &peek()
{
while (true) {
{
ScopedMutex local_lock(mutex);
if (full || (head != tail))
return data[tail];
}
Sleep(0);
}
};
void enqueue(const T &t)
{
while (true) {
{
ScopedMutex local_lock(mutex);
if (!full) {
data[head++] = t;
head %= depth;
full = (head == tail);
return;
}
}
Sleep(0);
}
};
void dequeue()
{
while (true) {
{
ScopedMutex local_lock(mutex);
if (full || (head != tail)) {
++tail;
tail %= depth;
full = false;
return;
}
}
Sleep(0);
}
};
};
template <unsigned num_vals, int val, unsigned depth>
void *
producer(void *arg)
{
Queue<int, depth> &queue = *static_cast<Queue<int, depth> *>(arg);
for (unsigned i = 0; i < num_vals; ++i) {
queue.enqueue(val);
}
std::cerr << "producer " << val << " exiting.\n";
return NULL;
}
template <unsigned num_vals, int val, unsigned depth>
void *
consumer(void *arg)
{
Queue<int, depth> &queue = *static_cast<Queue<int, depth> *>(arg);
for (unsigned i = 0; i < num_vals; ++i) {
while (queue.peek() != val)
Sleep(0);
if (queue.peek() != val) {
std::cerr << "ERROR: (" << val << ", " << queue.peek() << ")" << std::endl;
std::cerr << "But peeking again gives the right value " << queue.peek() << std::endl;
::exit(1);
}
queue.dequeue();
}
return NULL;
}
int
main(int argc, char *argv[])
{
const unsigned depth = 10;
const unsigned num_vals = 100000;
Queue<int, depth> queue;
HANDLE p1, p2, c1, c2;
start_thread(p1, producer<num_vals, 1, depth>, &queue);
start_thread(p2, producer<num_vals, 2, depth>, &queue);
start_thread(c1, consumer<num_vals, 1, depth>, &queue);
start_thread(c2, consumer<num_vals, 2, depth>, &queue);
join_thread(p1);
join_thread(p2);
join_thread(c1);
join_thread(c2);
}