views:

1877

answers:

7

I would like to implement a producer/consumer scenario that obeys interfaces that are roughly:

class Consumer {
private:
    vector<char> read(size_t n) {
        // If the internal buffer has `n` elements, then dequeue them
        // Otherwise wait for more data and try again
    }
public:
    void run() {
        read(10);
        read(4839);
        // etc
    }
    void feed(const vector<char> &more) {
        // Safely queue the data
        // Notify `read` that there is now more data
    }
};

In this case, feed and run will run on separate threads and read should be a blocking read (like recv and fread). Obviously, I will need some kind of mutual exclusion on my deque, and I will need some kind of notification system to inform read to try again.

I hear condition variables are the way to go, but all my multithreading experience lies with Windows and am having a hard time wrapping my head around them.

Thanks for any help!

(Yes, I know it's inefficient to return vectors. Let's not get into that.)

+1  A: 

I'll throw down some semi-pseudo-code. Here are my comments:

1)Very large grains of locking here. If you need faster access, you will want to rethink your data structures. The STL is not threadsafe.

2)Lock will block until the mutex lets it through. The mutex structure is that it lets 1 thread through it at a time with the lock/unlock mechanism. No need for polling or for some kind of exception-esque structure.

3)This is a pretty syntactically hacky cut at the problem. I'm not being precise with the API nor C++ syntax, but I believe it gives a semantically correct solution.

4)Edited in response to comment.

class piper
{
pthread_mutex queuemutex;
pthread_mutex readymutex;
bool isReady; //init to false by constructor

//whatever else
};

piper::read()
{//whatever
pthread_mutex_lock(&queuemutex)
if(myqueue.size() >= n)
{ 
   return_queue_vector.push_back(/* you know what to do here */)

    pthread_mutex_lock(&readymutex)
    isReady = false;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}

piper::push_em_in()
{
//more whatever
pthread_mutex_lock(&queuemutex)
//push push push
if(myqueue.size() >= n)
{
    pthread_mutex_lock(&readymutex)
    isReady = true;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}
Paul Nathan
Good start, but remember that I want my read to succeed. There is no guarantee that `push_em_in` will dump enough data for that to happen. So the read will need to wait till there is enough. It's that loop that I want to make sure is efficient (non-spinning).
Frank Krueger
You could also use RAII to make sure your lock() unlock() is exception safe.
Martin York
@Frank, took another hack at the concept. Are you following how to use the pthread mutex better now?
Paul Nathan
Definitely following on the mutex stuff. But again, I need `read` to succeed - it should be a blocking read. Yours can still fail. I appreciate you putting effort into this however! Wish I could vote you up a second time!
Frank Krueger
+2  A: 

Found some tutorial stuff here.

tvanfosson
+7  A: 

This code is not production ready. No error checking is done on the results of any library calls.

I have wrapped the lock/unlock of the mutex in LockThread so it is exception safe. But that's about it.

In addition if I was doing this seriously I would wrap the mutex and condition variables inside objects so they can cot be abused inside other methods of Consumer. But as long as you take note that the lock must be acquired before you use the condition variable (in any way) then this simple situation can stand as is.

Out of interest have you checked the boost threading library?

#include <iostream>
#include <vector>
#include <pthread.h>

class LockThread
{
    public:
    LockThread(pthread_mutex_t& m)
        :mutex(m)
    {
        pthread_mutex_lock(&mutex);
    }
    ~LockThread()
    {
        pthread_mutex_unlock(&mutex);
    }
    private:
        pthread_mutex_t& mutex;
};
class Consumer
{
    pthread_mutex_t     lock;
    pthread_cond_t      cond;
    std::vector<char>   unreadData;
    public:
    Consumer()
    {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&cond,NULL);
    }
    ~Consumer()
    {
        pthread_cond_destroy(&cond);
        pthread_mutex_destroy(&lock);
    }

    private:
        std::vector<char> read(size_t n)
        {
            LockThread  locker(lock);
            while (unreadData.size() < n)
            {
                // Must wait until we have n char.
                // This is a while loop because feed may not put enough in.

                // pthread_cond() releases the lock.
                // Thread will not be allowed to continue until
                // signal is called and this thread reacquires the lock.

                pthread_cond_wait(&cond,&lock);

                // Once released from the condition you will have re-aquired the lock.
                // Thus feed() must have exited and released the lock first.
            }

            /*
             * Not sure if this is exactly what you wanted.
             * But the data is copied out of the thread safe buffer
             * into something that can be returned.
             */
            std::vector<char>   result(n); // init result with size n
            std::copy(&unreadData[0],
                      &unreadData[n],
                      &result[0]);

            unreadData.erase(unreadData.begin(),
                             unreadData.begin() + n);
            return (result);
        }
public:
    void run()
    {
        read(10);
        read(4839);
        // etc
    }
    void feed(const std::vector<char> &more)
    {
        LockThread  locker(lock);

        // Once we acquire the lock we can safely modify the buffer.
        std::copy(more.begin(),more.end(),std::back_inserter(unreadData));

        // Only signal the thread if you have the lock
        // Otherwise race conditions happen.
        pthread_cond_signal(&cond);

        // destructor releases the lock and thus allows read thread to continue.
    }
};


int main()
{
    Consumer    c;
}
Martin York
This looks very nice. One note (just a refinement), but most sites say you need to protect the condition variable itself with a mutex in order to prevent race conditions. Multithreading is fun, isn't it?
Frank Krueger
The condition variable is protected by a mutex. In both cases read() and feed() you must acquire the lock before you can do anything with the condition variable.
Martin York
Sorry about that. I missed it in your code. Very nice.
Frank Krueger
And to answer your question: I'm not using boost because of its size. Every time I think of using it, the download size/my inability to read through 10 layers of macros/and its build system with all the compiler switches creates a sick feeling in my stomach. It's ugly.
Frank Krueger
Signalling a condition variable is ok if it doesn't happen inside the mutex, but is generally advised against. It only really matters if the order waiters are woken up is important (eg. to prevent starvation). Usually it isn't.
Greg Rogers
You don't have to build boost to use the threads library, it is all in headers
1800 INFORMATION
@Greg Rogers: I would argue it usually is. Especially since it can be released by factors other than an explicit call to pthread_cond_signal(). If that happened it could be released as the feeder is modifying unreadData. Then things become imposable to reproduce/debug.
Martin York
It is a harmless race condition though. Even if the reader is woken up, it still blocks waiting to get the mutex to check if there is something there and copy the data out.
Greg Rogers
@Greg: Race conditions are never harmless as they make the code non-deterministic.
Martin York
+2  A: 

I tend to use what I call a "Syncronized Queue". I wrap the normal queue and use a Semaphore class for both locking and making read block just as you desire:

#ifndef SYNCQUEUE_20061005_H_
#define SYNCQUEUE_20061005_H_

#include <queue>
#include "Semaphore.h"

// similar, but slightly simpler interface to std::queue
// this queue implementation will serialize pushes and pops
// and block on a pop while empty (as apposed to throwing an exception)
// it also locks as neccessary on insertion and removal to avoid race 
// conditions

template <class T, class C = std::deque<T> > class SyncQueue {
protected:
    std::queue<T, C> m_Queue;
    Semaphore   m_Semaphore;
    Mutex    m_Mutex;

public:
    typedef typename std::queue<T, C>::value_type value_type;
    typedef typename std::queue<T, C>::size_type size_type;

    explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {}

    bool empty() const    { return m_Queue.empty(); }
    size_type size() const   { return m_Queue.size(); }

    void push(const value_type& x);
    value_type pop();
};

template <class T, class C>
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) {
    // atomically push item
    m_Mutex.lock(); 
    m_Queue.push(x); 
    m_Mutex.unlock(); 

    // let blocking semaphore know another item has arrived
    m_Semaphore.v();
}

template <class T, class C>
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() {
    // block until we have at least one item
    m_Semaphore.p();

    // atomically read and pop front item
    m_Mutex.lock();
    value_type ret = m_Queue.front();
    m_Queue.pop();
    m_Mutex.unlock();

    return ret;
}

#endif

You can implement semaphores and mutexes with the appropriate primitives in your threading implementation.

NOTE: this implementation is an example for single elements in a queue, but you could easily wrap this with a function which buffers results until N have been provided. something like this if it is a queue of chars:

std::vector<char> func(int size) {
    std::vector<char> result;
    while(result.size() != size) {
        result.push_back(my_sync_queue.pop());
    }
    return result;
}
Evan Teran
+1  A: 

Just for fun, here is a quick and dirty implementation using Boost. It uses pthreads under the hood on platforms that support it, and on windows uses windows operations.

boost::mutex access;
boost::condition cond;

// consumer
data read()
{
  boost::mutex::scoped_lock lock(access);
  // this blocks until the data is ready
  cond.wait(lock);

  // queue is ready
  return data_from_queue();
}

// producer
void push(data)
{
  boost::mutex::scoped_lock lock(access);
  // add data to queue

  if (queue_has_enough_data())
    cond.notify_one();  
}
1800 INFORMATION
The condition is only notified if there is enough data so the loop shouldn't be necessary - you should read up on boost threads and condition variables, the code is correct and there is no deadlock
1800 INFORMATION
That is to say, the condition behaves nicely and releases the lock before it blocks
1800 INFORMATION
+1  A: 

For even more fun, here is my final version. STL-ized for no good reason. :-)

#include <algorithm>
#include <deque>
#include <pthread.h>

template<typename T>
class MultithreadedReader {
    std::deque<T>   buffer;
    pthread_mutex_t moreDataMutex;
    pthread_cond_t  moreDataCond;

protected:
    template<typename OutputIterator>
    void read(size_t count, OutputIterator result) {
        pthread_mutex_lock(&moreDataMutex);

        while (buffer.size() < count) {
            pthread_cond_wait(&moreDataCond, &moreDataMutex);
        }
        std::copy(buffer.begin(), buffer.begin() + count, result);
        buffer.erase(buffer.begin(), buffer.begin() + count);

        pthread_mutex_unlock(&moreDataMutex);
    }

public:
    MultithreadedReader() {
        pthread_mutex_init(&moreDataMutex, 0);
        pthread_cond_init(&moreDataCond, 0);
    }

    ~MultithreadedReader() {
        pthread_cond_destroy(&moreDataCond);
        pthread_mutex_destroy(&moreDataMutex);
    }

    template<typename InputIterator>
    void feed(InputIterator first, InputIterator last) {
        pthread_mutex_lock(&moreDataMutex);

        buffer.insert(buffer.end(), first, last);
        pthread_cond_signal(&moreDataCond);

        pthread_mutex_unlock(&moreDataMutex);
    }
};
Frank Krueger
@Frank: why the read() is protected?
Jinx
The class was designed as a base class whose subtype did the reading itself and just wants to be fed. It's a streaming protocol where the class is like a little unix app.
Frank Krueger
A: 

Glib Asynchronous Queues provide the locking and sleep on reading an empty queue you are looking for. See http://library.gnome.org/devel/glib/2.20/glib-Asynchronous-Queues.html You can combine them with gthreads or gthread pools.

bobmcn