views:

214

answers:

3

I have the following code:

class TimeOutException
{};

template <typename T>
class MultiThreadedBuffer
{
public:
 MultiThreadedBuffer()
 {
  InitializeCriticalSection(&m_csBuffer);
  m_evtDataAvail = CreateEvent(NULL, TRUE, FALSE, NULL);
 }
 ~MultiThreadedBuffer()
 {
  CloseHandle(m_evtDataAvail);
  DeleteCriticalSection(&m_csBuffer);
 }
 void LockBuffer()
 {
  EnterCriticalSection(&m_csBuffer);
 }
 void UnlockBuffer()
 {
  LeaveCriticalSection(&m_csBuffer);
 }
 void Add(T val)
 {
  LockBuffer();
  m_buffer.push_back(val);
  SetEvent(m_evtDataAvail);
  UnlockBuffer();
 }
 T Get(DWORD timeout)
 {
  T val;
  if (WaitForSingleObject(m_evtDataAvail, timeout) == WAIT_OBJECT_0) {
   LockBuffer();

   if (!m_buffer.empty()) {
    val = m_buffer.front();
    m_buffer.pop_front();
   }

   if (m_buffer.empty()) {
    ResetEvent(m_evtDataAvail);
   }

   UnlockBuffer();
  } else {
   throw TimeOutException();
  }
  return val;
 }
 bool IsDataAvail()
 {
  return (WaitForSingleObject(m_evtDataAvail, 0) == WAIT_OBJECT_0);
 }
 std::list<T> m_buffer;
 CRITICAL_SECTION m_csBuffer;
 HANDLE m_evtDataAvail;
};

Unit testing shows that this code works fine when used on a single thread as long as T's default constructor and copy/assignment operators don't throw. Since I'm writting T, that is acceptable.

My problem is the Get method. If there is no data available (i.e. m_evtDataAvail is not set), then a couple of threads can block on the WaitForSingleObject call. When new data becomes available, they all fall through to the Lock() call. Only one will pass and can get the data out and move on. After the Unlock() another thread can move on through and will find that there is no data. Currently it will return the default object.

What I want to happen is for that second thread (and others) to go back to the WaitForSingleObject call. I could add an else block that unlocked and did a goto but that just feels evil.

That solution also adds the possibility for an endless loop since each trip back would restart the timeout. I could add some code to check the clock on entry and adjust the timeout on each trip back but then this simple Get method starts to get very complicated.

Any ideas on how to solve these problems while maintaining testability and simplicity?

Oh, for anyone wondering, the IsDataAvail function only exists for testing. It won't be used in production code. The Add and Get are the only methods that will be used in a non-testing environment.

+7  A: 

You need to create a auto-reset event instead of a manual reset event. This guarantees that if multiple threads are waiting on an event, and when the event is set only one thread will be released. All other threads will remain in waiting state. You can create auto-reset event by passing FALSE to the second parameter of CreateEvent API. Also, note that this code is not exception safe i.e. after locking the buffer, if some statement throws an exception your critical section will not be unlocked. Use RAII principle to ensure that your critical section gets unlocked even in the case of exceptions.

Naveen
First, I have historically used RAII in just these circumstances. I chose not to in this case to increase the testing area. I'm just getting into TDD.
Jere.Jones
Second, BRILLIANT! I changed the if-empty-reset block to if-not-empty-set and every path I can think of is handled properly. Thanks!
Jere.Jones
+1, Autoreset would do. There is one disadvantage of using autoReset here. If there are multiple items added in m_buffer using Add method then there is a chance that threads are waiting but m_buffer has still some items not popped yet.
aJ
+4  A: 

You could use a Semaphore object instead of a generic Event object. The semaphore count should be initialized to 0 and incremented by 1 with ReleaseSemaphore each time Add is called. That way the WaitForSingleObject in Get will never release more threads to read from the buffer than there are values in the buffer.

Chris Dodd
Wow. Out of left field comes an outstanding alternative. That is elegant and simple. Friggin' beautiful.
Jere.Jones
+1, Looks good solution
aJ
+3  A: 

You will always have to code for the case the event is signaled but there is not data, even WITH auto-reset events. There is a race condition from the moment WaitForsingleevent wakes until the LockBuffer is called, and in that interval another thread can pop the data from the buffer. Your code must place WaitForSingleEvent in a loop. Decrease the timeout with the time already spent in each loop iteration...

As an alternative, may I interest you in more scalable and performant alternatives? Interlocked Singly Linked Lists, OS thread pool QueueUserWorkItem and idempotent processing. Add pushes an entry into the list and submits a work item. The work item pops an entry and if not NULL, process it. You can go fancy and have extra logic for the processor to loop and keep a state marking its 'active' presence so that the Add does not quuee unnecessary work items, but that is not strictly required. For even higher sclae and multi-core/multi-cpu load spread I recommend using queued completion ports. The details are described in Rick Vicik's articles, I have a blog entry that links all 3 at once: High Performance Windows programs.

Remus Rusanu
I really like the idea of a nolock list, but as far as I can tell, those lists are LIFO and I need FIFO. Or am I missing something?
Jere.Jones
You didn't miss anything. I didn't know you want strict FIFO enforced. Afaik FIFO lock free lists are not available, because they cannot be maintained in a single interlockedexchange operation, like LIFO lists can.
Remus Rusanu