Hi,
I want to implement a parallel reading-processing-writing line by line based on boost::thread, but the current version has indefinite behaviour : the following test reads a CSV file by filling the read (concurrent) queue, which is simply transferred into the writing queue to be written in an output file (no processing for now).
Problems encountered :
- On both Windows and Unix, the program randomly never ends (~3/5 times), and generates a SIGSEGV (~1/100)
- On Unix, there are many errors : SIGABRT at creation of the thread, "memory cloberred before allocated block" (-> SIGABRT as well) after creation, randomly between 1 and ~15 lines.
I HATE to give the problems and the codes and let the others answer (I am sometimes on your side of the topics), but believe me, I can't think of anything else to correct it (dealing with threads, debugging's a nightmare), so I apologize in advance. Here it is :
Main.cpp :
#include "io.hpp"
#include <iostream>
int main(int argc, char *argv[]) {
CSV::Reader reader;
CSV::Writer writer;
if(reader.open("test_grandeur_nature.csv") && writer.open("output.txt")) {
CSV::Row row;
reader.run(); //Reads the CSV file and fills the read queue
writer.run(); //Reads the to-be-written queue and writes it to a txt file
//The loop is supposed to end only if the reader is finished and empty
while(!(reader.is_finished() && reader.empty())) {
//Transfers line by line from the read to the to-be-written queues
reader.wait_and_pop(row);
writer.push(row);
}
//The reader will likely finish before the writer, so he has to finish his queue before continuing.
writer.finish();
}
else {
std::cout << "File error";
}
return EXIT_SUCCESS;
}
Io.hpp :
#ifndef IO_H_INCLUDED
#define IO_H_INCLUDED
#include "threads.hpp"
#include <fstream>
namespace CSV {
class Row {
std::vector<std::string> m_data;
friend class Iterator;
friend void write_row(Row const &row, std::ostream &stream);
void read_next(std::istream& csv);
public:
inline std::string const& operator[](std::size_t index) const {
return m_data[index];
}
inline std::size_t size() const {
return m_data.size();
}
};
/** Reading *************************************************************************/
class Iterator {
public:
Iterator(std::istream& csv) : m_csv(csv.good() ? &csv : NULL) {
++(*this);
}
Iterator() : m_csv(NULL) {}
//Pre-Increment
Iterator& operator++() {
if (m_csv != NULL) {
m_row.read_next(*m_csv);
m_csv = m_csv->good() ? m_csv : NULL;
}
return *this;
}
inline Row const& operator*() const {
return m_row;
}
inline bool operator==(Iterator const& rhs) {
return ((this == &rhs) || ((this->m_csv == NULL) && (rhs.m_csv == NULL)));
}
inline bool operator!=(Iterator const& rhs) {
return !((*this) == rhs);
}
private:
std::istream* m_csv;
Row m_row;
};
class Reader : public Concurrent_queue<Row>, public Thread {
std::ifstream m_csv;
Thread_safe_value<bool> m_finished;
void work() {
if(!!m_csv) {
for(Iterator it(m_csv) ; it != Iterator() ; ++it) {
push(*it);
}
m_finished.set(true);
}
}
public:
Reader() {
m_finished.set(false);
}
inline bool open(std::string path) {
m_csv.open(path.c_str());
return !!m_csv;
}
inline bool is_finished() {
return m_finished.get();
}
};
/** Writing ***************************************************************************/
void write_row(Row const &row, std::ostream &stream);
//Is m_finishing really thread-safe ? By the way, is it mandatory ?
class Writer : public Concurrent_queue<Row>, public Thread {
std::ofstream m_csv;
Thread_safe_value<bool> m_finishing;
void work() {
if(!!m_csv) {
CSV::Row row;
while(!(m_finishing.get() && empty())) {
wait_and_pop(row);
write_row(row, m_csv);
}
}
}
public:
Writer() {
m_finishing.set(false);
}
inline void finish() {
m_finishing.set(true);
catch_up();
}
inline bool open(std::string path) {
m_csv.open(path.c_str());
return !!m_csv;
}
};
}
#endif
Io.cpp :
#include "io.hpp"
#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>
void CSV::Row::read_next(std::istream& csv) {
std::string row;
std::getline(csv, row);
boost::tokenizer<boost::escaped_list_separator<char> > tokenizer(row, boost::escaped_list_separator<char>('\\', ';', '\"'));
m_data.assign(tokenizer.begin(), tokenizer.end());
}
void CSV::write_row(Row const &row, std::ostream &stream) {
std::copy(row.m_data.begin(), row.m_data.end(), std::ostream_iterator<std::string>(stream, ";"));
stream << std::endl;
}
Threads.hpp :
#ifndef THREADS_HPP_INCLUDED
#define THREADS_HPP_INCLUDED
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class Thread {
protected:
boost::thread *m_thread;
virtual void work() = 0;
void do_work() {
work();
}
public:
Thread() : m_thread(NULL) {}
virtual ~Thread() {
catch_up();
if(m_thread != NULL) {
delete m_thread;
}
}
inline void catch_up() {
if(m_thread != NULL) {
m_thread->join();
}
}
void run() {
m_thread = new boost::thread(boost::bind(&Thread::do_work, boost::ref(*this)));
}
};
/** Thread-safe datas **********************************************************/
#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
template <class T>
class Thread_safe_value : public boost::noncopyable {
T m_value;
boost::mutex m_mutex;
public:
T const &get() {
boost::mutex::scoped_lock lock(m_mutex);
return m_value;
}
void set(T const &value) {
boost::mutex::scoped_lock lock(m_mutex);
m_value = value;
}
};
template<typename Data>
class Concurrent_queue {
std::queue<Data> m_queue;
mutable boost::mutex m_mutex;
boost::condition_variable m_cond;
public:
void push(Data const& data) {
boost::mutex::scoped_lock lock(m_mutex);
m_queue.push(data);
lock.unlock();
m_cond.notify_one();
}
bool empty() const {
boost::mutex::scoped_lock lock(m_mutex);
return m_queue.empty();
}
void wait_and_pop(Data& popped) {
boost::mutex::scoped_lock lock(m_mutex);
while(m_queue.empty()) {
m_cond.wait(lock);
}
popped = m_queue.front();
m_queue.pop();
}
};
#endif // THREAD_HPP_INCLUDED
This project is important and I would really appreciate if you could help me out =)
Thanks aforehand.
Regards,
Mister Mystère.