tags:

views:

30

answers:

1

Using pthreads in linux 2.6.30 I am trying to send a single signal which will cause multiple threads to begin execution. The broadcast seems to only be received by one thread. I have tried both pthread_cond_signal and pthread cond_broadcast and both seem to have the same behavior. For the mutex in pthread_cond_wait, I have tried both common mutexes and separate (local) mutexes with no apparent difference.


worker_thread(void *p)
{
   // setup stuff here

   printf("Thread %d ready for action \n", p->thread_no);
   pthread_cond_wait(p->cond_var, p->mutex);
   printf("Thread %d off to work \n", p->thread_no);

   // work stuff
}

dispatch_thread(void *p)
{
   // setup stuff

   printf("Wakeup, everyone ");
   pthread_cond_broadcast(p->cond_var);
   printf("everyone should be working \n");

   // more stuff
}

main()
{
   pthread_cond_init(cond_var);

   for (i=0; i!=num_cores; i++) {
      pthread_create(worker_thread...);
   }

   pthread_create(dispatch_thread...);
}

Output:


  Thread 0 ready for action
  Thread 1 ready for action
  Thread 2 ready for action
  Thread 3 ready for action
  Wakeup, everyone
  everyone should be working
  Thread 0 off to work

What's a good way to send signals to all the threads?

+3  A: 

First off, you should have the mutex locked at the point where you call pthread_cond_wait(). It's generally a good idea to hold the mutex when you call pthread_cond_broadcast(), as well.

Second off, you should loop calling pthread_cond_wait() while the wait condition is true. Spurious wakeups can happen, and you must be able to handle them.

Finally, your actual problem: you are signaling all threads, but some of them aren't waiting yet when the signal is sent. Your main thread and dispatch thread are racing your worker threads: if the main thread can launch the dispatch thread, and the dispatch thread can grab the mutex and broadcast on it before the worker threads can, then those worker threads will never wake up.

You need a synchronization point prior to signaling where you wait to signal till all threads are known to be waiting for the signal. That, or you can keep signaling till you know all threads have been woken up.

In this case, you could use the mutex to protect a count of sleeping threads. Each thread grabs the mutex and increments the count. If the count matches the count of worker threads, then it's the last thread to increment the count and so signals on another condition variable sharing the same mutex to the sleeping dispatch thread that all threads are ready. The thread then waits on the original condition, which causes it release the mutex.

If the dispatch thread wasn't sleeping yet when the last worker thread signals on that condition, it will find that the count already matches the desired count and not bother waiting, but immediately broadcast on the shared condition to wake workers, who are now guaranteed to all be sleeping.

Anyway, here's some working source code that fleshes out your sample code and includes my solution:

#include <stdio.h>
#include <pthread.h>
#include <err.h>

static const int num_cores = 8;

struct sync {
  pthread_mutex_t *mutex;
  pthread_cond_t *cond_var;
  int thread_no;
};

static int sleeping_count = 0;
static pthread_cond_t all_sleeping_cond = PTHREAD_COND_INITIALIZER;

void *
worker_thread(void *p_)
{
  struct sync *p = p_;
   // setup stuff here

   pthread_mutex_lock(p->mutex);
   printf("Thread %d ready for action \n", p->thread_no);

   sleeping_count += 1;
   if (sleeping_count >= num_cores) {
       /* Last worker to go to sleep. */
       pthread_cond_signal(&all_sleeping_cond);
   }

   int err = pthread_cond_wait(p->cond_var, p->mutex);
   if (err) warnc(err, "pthread_cond_wait");
   printf("Thread %d off to work \n", p->thread_no);
   pthread_mutex_unlock(p->mutex);

   // work stuff
   return NULL;
}

void *
dispatch_thread(void *p_)
{
  struct sync *p = p_;
   // setup stuff

   pthread_mutex_lock(p->mutex);
   while (sleeping_count < num_cores) {
       pthread_cond_wait(&all_sleeping_cond, p->mutex);
   }
   printf("Wakeup, everyone ");
   int err = pthread_cond_broadcast(p->cond_var);
   if (err) warnc(err, "pthread_cond_broadcast");
   printf("everyone should be working \n");
   pthread_mutex_unlock(p->mutex);

   // more stuff
   return NULL;
}

int
main(void)
{
   pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
   pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;

   pthread_t worker[num_cores];
   struct sync info[num_cores];
   for (int i = 0; i < num_cores; i++) {
      struct sync *p = &info[i];
      p->mutex = &mutex;
      p->cond_var = &cond_var;
      p->thread_no = i;
      pthread_create(&worker[i], NULL, worker_thread, p);
   }

   pthread_t dispatcher;
   struct sync p = {&mutex, &cond_var, num_cores};
   pthread_create(&dispatcher, NULL, dispatch_thread, &p);

   pthread_exit(NULL);
   /* not reached */
   return 0;
}
Jeremy W. Sherman