views:

111

answers:

2

Hi,

I want to implement a parallel reading-processing-writing line by line based on boost::thread, but the current version has indefinite behaviour : the following test reads a CSV file by filling the read (concurrent) queue, which is simply transferred into the writing queue to be written in an output file (no processing for now).

Problems encountered :

  • On both Windows and Unix, the program randomly never ends (~3/5 times), and generates a SIGSEGV (~1/100)
  • On Unix, there are many errors : SIGABRT at creation of the thread, "memory cloberred before allocated block" (-> SIGABRT as well) after creation, randomly between 1 and ~15 lines.

I HATE to give the problems and the codes and let the others answer (I am sometimes on your side of the topics), but believe me, I can't think of anything else to correct it (dealing with threads, debugging's a nightmare), so I apologize in advance. Here it is :

Main.cpp :

#include "io.hpp"

#include <iostream>

int main(int argc, char *argv[]) {
    CSV::Reader reader;
    CSV::Writer writer;

    if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) {
        CSV::Row row;

        reader.run(); //Reads the CSV file and fills the read queue
        writer.run(); //Reads the to-be-written queue and writes it to a txt file

        //The loop is supposed to end only if the reader is finished and empty
        while(!(reader.is_finished() && reader.empty())) {
            //Transfers line by line from the read to the to-be-written queues
            reader.wait_and_pop(row);
            writer.push(row);
        }
        //The reader will likely finish before the writer, so he has to finish his queue before continuing.
        writer.finish(); 
    }
    else {
        std::cout << "File error";
    }

    return EXIT_SUCCESS;
}

Io.hpp :

#ifndef IO_H_INCLUDED
#define IO_H_INCLUDED

#include "threads.hpp"

#include <fstream>

namespace CSV {
    class Row {
        std::vector<std::string> m_data;

        friend class Iterator;
        friend void write_row(Row const &row, std::ostream &stream);

        void read_next(std::istream& csv);

        public:
            inline std::string const& operator[](std::size_t index) const {
                return m_data[index];
            }
            inline std::size_t size() const {
                return m_data.size();
            }
    };

    /** Reading *************************************************************************/

    class Iterator {
        public:
            Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
                ++(*this);
            }
            Iterator() : m_csv(NULL) {}

            //Pre-Increment
            Iterator& operator++() {
                if (m_csv != NULL) {
                    m_row.read_next(*m_csv);
                    m_csv = m_csv->good() ? m_csv : NULL;
                }

                return *this;
            }
            inline Row const& operator*() const {
                return m_row;
            }

            inline bool operator==(Iterator const& rhs) {
                return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
            }
            inline bool operator!=(Iterator const& rhs) {
                return !((*this) == rhs);
            }
        private:
            std::istream* m_csv;
            Row m_row;
    };

    class Reader : public Concurrent_queue<Row>, public Thread {
        std::ifstream m_csv;

        Thread_safe_value<bool> m_finished;

        void work() {
            if(!!m_csv) {
                for(Iterator it(m_csv) ; it != Iterator() ; ++it) {
                    push(*it);
                }
                m_finished.set(true);
            }
        }

    public:
        Reader() {
            m_finished.set(false);
        }

        inline bool open(std::string path) {
            m_csv.open(path.c_str());

            return !!m_csv;
        }

        inline bool is_finished() {
            return m_finished.get();
        }
    };

    /** Writing ***************************************************************************/

    void write_row(Row const &row, std::ostream &stream);

    //Is m_finishing really thread-safe ? By the way, is it mandatory ?
    class Writer : public Concurrent_queue<Row>, public Thread {
        std::ofstream m_csv;

        Thread_safe_value<bool> m_finishing;

        void work() {
            if(!!m_csv) {
                CSV::Row row;

                while(!(m_finishing.get() && empty())) {
                    wait_and_pop(row);
                    write_row(row, m_csv);
                }
            }
        }

    public:
        Writer() {
            m_finishing.set(false);
        }

        inline void finish() {
            m_finishing.set(true);
            catch_up();
        }

        inline bool open(std::string path) {
            m_csv.open(path.c_str());

            return !!m_csv;
        }
    };
}

#endif

Io.cpp :

#include "io.hpp"

#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>

void CSV::Row::read_next(std::istream& csv) {
    std::string row;
    std::getline(csv, row);

    boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
    m_data.assign(tokenizer.begin(), tokenizer.end());
}

void CSV::write_row(Row const &row, std::ostream &stream) {
    std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
    stream << std::endl;
}

Threads.hpp :

#ifndef THREADS_HPP_INCLUDED
#define THREADS_HPP_INCLUDED

#include <boost/bind.hpp>
#include <boost/thread.hpp>

class Thread {
protected:
    boost::thread *m_thread;

    virtual void work() = 0;

    void do_work() {
        work();
    }

public:
    Thread() : m_thread(NULL) {}
    virtual ~Thread() {
        catch_up();
        if(m_thread != NULL) {
            delete m_thread;
        }
    }

    inline void catch_up() {
        if(m_thread != NULL) {
            m_thread->join();
        }
    }

    void run() {
        m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
    }
};

/** Thread-safe datas **********************************************************/

#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

template <class T>
class Thread_safe_value : public boost::noncopyable {
    T m_value;
    boost::mutex m_mutex;

    public:
        T const &get() {
            boost::mutex::scoped_lock lock(m_mutex);
            return m_value;
        }
        void set(T const &value) {
            boost::mutex::scoped_lock lock(m_mutex);
            m_value = value;
        }
};

template<typename Data>
class Concurrent_queue {
    std::queue<Data> m_queue;
    mutable boost::mutex m_mutex;
    boost::condition_variable m_cond;

public:
    void push(Data const& data) {
        boost::mutex::scoped_lock lock(m_mutex);
        m_queue.push(data);
        lock.unlock();
        m_cond.notify_one();
    }

    bool empty() const {
        boost::mutex::scoped_lock lock(m_mutex);
        return m_queue.empty();
    }

    void wait_and_pop(Data& popped) {
        boost::mutex::scoped_lock lock(m_mutex);
        while(m_queue.empty()) {
            m_cond.wait(lock);
        }

        popped = m_queue.front();
        m_queue.pop();
    }
};

#endif // THREAD_HPP_INCLUDED

This project is important and I would really appreciate if you could help me out =)

Thanks aforehand.

Regards,

Mister Mystère.

A: 

After a quick read-through the only obvious problem I have spotted, which would likely account for some (but maybe not all) of your problems is you're not properly signaling on your condition in Concurrent_queue::push.

Any time you find yourself calling unlock() on a scoped mutex should be an indication to you that something is awry. One of the main points of using scoped mutexes is that locks and unlocks are implicit when the object comes in/goes out of scope. If you find yourself needed an unlock, you may need to restructure your code.

In this case, though, you don't actually need to restructure the code. In this case, the unlock is just wrong. When a condition is signaled, the mutex needs to be locked. It is unlocked after the signal takes place. So you can replace unlock with this code:

void push(Data const& data) {
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push(data);
    m_cond.notify_one();
}

Which will unlock the mutex when the function returns, after the condition has been signaled.

SoapBox
Are you sure ? I think that the one who coded that class (this is the only part of the code which isn't mine : http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html) wanted to make sure the lock was unlocked before notifying... Apart from that, this does not change anything on Windows (I can't try on Unix for now) : the thread runs over and over and never returns 3/5 times.
Mister Mystère
Thanks anyway for having taken time to answer me, I forgot to be polite ^^
Mister Mystère
+1  A: 

You have an error in your completion logic.

The main() loop is reading the last entry from the queue, and blocking waiting for the next one before the m_finished flag is set.

If you stick a hefty wait before the call to m_finished.set(true) (such as sleep(5) on linux or Sleep(5000) on Windows for a 5 second wait) then your code will hang every time.

(This doesn't address the seg faults or memory allocation errors, which are probably something else)

The problematic execution goes like this:

  1. reader thread reads last item from file and pushes on queue.
  2. main thread pops last item from queue.
  3. main thread pushes last item on queue for writer thread.
  4. main thread loops round; m_finished is not set, so it calls wait_and_pop.
  5. reader thread realises it is at the end of file and sets m_finished.
  6. main thread is now blocked waiting for another item on the reader queue, but the reader will not supply one.

The sleep call forces this order of events by putting a big delay between steps 1 and 5 on the reader thread, so the main thread has plenty of opportunity to do steps 2-4. It's a useful debugging technique for race conditions.

Anthony Williams
Thanks for your answer =) I don't understand, what do you mean ? I can't afford a sleep time of seconds, I have a performance objective... If this is what the solution was about (I don't understand why it should work, so I tested anyway and this doesn't change anything :/ ).
Mister Mystère
The sleep is a debugging aid. I've updated my answer above to reflect that.
Anthony Williams
What I understood is that when queue is empty but m_finished is about to be set to true, we enter the loop anyway and wait_and_pop() waits for an item in the queue which won't arrive, because the reader is now finished. I solved the problem using bool try_pop(Data } works like a charm. I'll try under Unix to see whether it fixed the other bugs or not. Thanks to you for having opened my eyes :)
Mister Mystère
Oh, and if something shocks you, please tell me ! I am not allowed to make any mistake.
Mister Mystère
Using try_pop will work, but loses the benefits of a blocking queue, as the main thread will consume CPU time looping and checking for new messages.It is often worth having a special "sentinel" value that you can post to the queue to indicate "no more data". This also avoids the need for the separate "finished" flag.
Anthony Williams