views:

413

answers:

6

The problem is that when I run the code below, on a single core, sometimes it runs correctly,and sometimes I get segmentation fault. Probably this problem will occure more frequently on a multi-core machine. I need to know where this non-determinism is introduces in my program and how can I resolve it.thanks.

int numThreads = 4;

class Evaluator;

struct E {
    Evaluator* evaluator;
    int id;
};

class Evaluator {
public:
    pthread_t * threads;
    sem_t* fork_sync;
    sem_t* join_sync;
    int tin;
    pthread_mutex_t tin_mut;

    double * d;
    int sz;
    int cursor;
    pthread_mutex_t c_mut;

    Evaluator(sem_t* fs, sem_t* js) {
        fork_sync = fs;
        join_sync = js;
        threads = new pthread_t[numThreads];
        tin = 0;
        pthread_mutex_init(&tin_mut,NULL);
        for(int i=0 ;i<numThreads; i++) {
            E arg;
            arg.evaluator = this;
            arg.id = i;
            pthread_create(&threads[i],NULL,(void* (*) (void*) )func,(void*)&arg);
        }


        //dummy init
        sz = 20;
        d = new double[sz];
        for(int i=0; i<sz ; i++) d[i] = .5 + i;
        cursor = 0;
        pthread_mutex_init(&c_mut,NULL);
    }

    static void func(E* e) {        
        Evaluator* eval = e -> evaluator;
        eval -> go(e -> id);
    }

    void reset() {
        cursor = 0;
    }

    void go(int id) {
        while(1) {
            sem_wait(fork_sync);

            pthread_mutex_lock(&tin_mut);
            ++tin;
            pthread_mutex_unlock(&tin_mut);

            while(1) {
                int idx;
                pthread_mutex_lock(&c_mut);
                idx = cursor++;
                pthread_mutex_unlock(&c_mut);
                if(idx >= sz ) break;
                // do the evaluation
                cout << "evaluating  index " << idx << " using thread " << id << endl;
            }

            int remain;
            pthread_mutex_lock(&tin_mut);
            remain = --tin;
            pthread_mutex_unlock(&tin_mut);
            if(remain == 0) sem_post(join_sync);
        }
    }


};

int main(int argc, char *argv[]) {

    sem_t fork_sync;
    sem_t join_sync;

    sem_init(&fork_sync,0,0);
    sem_init(&join_sync,0,0);

    Evaluator e(&fork_sync,&join_sync);


    //evaluating t times
    int t = 3;
    for(int i=0; i<t; i++) {
        cout << "---------- evaluation number :" << i << endl;
        e.reset();
        for(int j=0; j<numThreads; j++) sem_post(&fork_sync);
        sem_wait(&join_sync);
        cout << endl;
    }

    return 0;
}
A: 

Have you been able to reproduce the problem in debugger?

If you compiled optimized code; while the problem most likely is in your code, it might be worth trying with compiler optimizations off (-O0)

Kimvais
Threading problems are notoriously difficult to debug in debuggers, especially race conditions. But it's always worth a try!
Lars Wirzenius
Lars: I'm fully aware of that, however since the original poster does not mention trying and failing, I think this could lead to bad practice of "let's ask the internet before I try to figure out this myself" :)
Kimvais
+1  A: 

Navid, could you please provide an example that runs out of the box next time?

it doesn't hurt that much to add the following lines on top on your example

#include <pthread.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

// compile with: g++ -g -pthread main.cpp -o main -lrt -lpthread

When I start the program in the debugger then indeed it crashes sometimes on the sem_wait() line (and sometimes it does not crash!)

void go(int id) {
    while(1) {
        sem_wait(fork_sync); //  <--- seems to crash here
        ...
sure, you are right, that way is easier to compile, I just wanted it to be shorter :)I don't see the reason why it should crash on this line, there is no dangling pointer there ! btw thank a lot
Navid
You're welcome! Unfortunately I am not a pthreads expert. But that's the opportunity I've been waiting for to dive into it, and I will definitely "stay on the line" until this issue is solved :-)
+1  A: 

arg is on the stack. You are taking its address and passing this address to another thread. Race condition (value on the stack can be overwritten before the newly created thread reads it).

E arg;
arg.evaluator = this;
arg.id = i;
pthread_create(&threads[i],NULL,(void* (*) (void*) )func,(void*)&arg);

Solution:

E* arg = new E();
arg->evaluator = this;
arg->id = i;
pthread_create(&threads[i],NULL,(void* (*) (void*) )func,(void*)arg);

And don't forget to delete e in func.

ygrek
same behaviour, it crashes sometimes, sometimes it doesn't
I would argue that behavior is not the same - it crashes in another place :) Clearly there was more than one bug in this code. Semaphores are not used correctly. Add print statement at the end of `main()` and at `sem_post(join_sync)` and it becomes obvious. Program exits (and Evaluator object is destroyed) before all threads finish. Tasks do not get distributed between threads the way you expect.
ygrek
You are absolutely right, there was a logic problem . I wanted to sem_post(join_sync) be executed after all threads have done the inner while, the problem was that the wrong condition is checked for this purpose, as the tin can become 1 and zero , before other threads inter the inner while. thus sem_post(join_sync) is executed by more than a thread, something that is not expceted. Thanks a lot
Navid
A: 

try run under gdb and get back-trace.

vitaly.v.ch
A: 
josefx
after allocating arg on heap (using new) , the problem still remains.
Navid
A: 

Here is a quick fix to the second bug. This ensures that worker thread finishes before launching another iteration.

--- a/misc/so/sem_wait/q.cpp
+++ b/misc/so/sem_wait/q.cpp
@@ -83,7 +83,7 @@ public:
             pthread_mutex_lock(&tin_mut);
             remain = --tin;
             pthread_mutex_unlock(&tin_mut);
-            if(remain == 0) sem_post(join_sync);
+            sem_post(join_sync);
         }
     }

@@ -107,9 +107,11 @@ int main(int argc, char *argv[]) {
         cout << "---------- evaluation number :" << i << endl;
         e.reset();
         for(int j=0; j<numThreads; j++) sem_post(&fork_sync);
-        sem_wait(&join_sync);
+        for(int j=0; j<numThreads; j++) sem_wait(&join_sync);
         cout << endl;
     }

+    cout << "exit" << endl;
+
     return 0;
 }
ygrek
Thanks, I fixed it in a similar way!
Navid