views:

212

answers:

2
 #include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
 #define NUM_THREADS  4
 #define TCOUNT 5
 #define COUNT_LIMIT 13

 int     done = 0;
 int    count = 0;
 int     thread_ids[4] = {0,1,2,3};
 int thread_runtime[4] = {0,5,4,1};
 pthread_mutex_t count_mutex;
 pthread_cond_t count_threshold_cv;

 void *inc_count(void *t)
 {
   int i;
   long my_id = (long)t;
   long run_time = thread_runtime[my_id];
   if (my_id==2 && done==0) {
     for(i=0; i<5 ; i++) {
       if (i==4) {
         done = 1;
       }
       pthread_mutex_lock(&count_mutex);
       count++;

       if (count == COUNT_LIMIT) {
         pthread_cond_signal(&count_threshold_cv);
         printf("inc_count(): thread %ld, count = %d  Threshold reached.\n",
           my_id, count);
       }
       printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id, count);
       pthread_mutex_unlock(&count_mutex);
     }
   }

   if (my_id==3 && done==1) {
     for(i=0; i< 4 ; i++) {
       if (i==3) {
         done = 2;
       }
       pthread_mutex_lock(&count_mutex);
       count++;

       if (count == COUNT_LIMIT) {
         pthread_cond_signal(&count_threshold_cv);
         printf("inc_count(): thread %ld, count = %d  Threshold reached.\n",
           my_id, count);
       }
       printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id, count);
       pthread_mutex_unlock(&count_mutex);
     }
   }

   if (my_id==4 && done==2) {
     for(i=0; i<8; i++) {
       pthread_mutex_lock(&count_mutex);
       count++;
       if (count == COUNT_LIMIT) {
         pthread_cond_signal(&count_threshold_cv);
         printf("inc_count(): thread %ld, count = %d  Threshold reached.\n",
           my_id, count);
       }
       printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id, count);
       pthread_mutex_unlock(&count_mutex);
     }
   }
   pthread_exit(NULL);
 }

 void *watch_count(void *t)
 {
   long my_id = (long)t;

   printf("Starting watch_count(): thread %ld\n", my_id);
   pthread_mutex_lock(&count_mutex);
   if (count<COUNT_LIMIT) {
     pthread_cond_wait(&count_threshold_cv, &count_mutex);
     printf("watch_count(): thread %ld Condition signal received.\n", my_id);
     count += 125;
     printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
   }
   pthread_mutex_unlock(&count_mutex);
   pthread_exit(NULL);
 }

 int main (int argc, char *argv[])
 {
   int i, rc;
   long t1=1, t2=2, t3=3, t4=4;
   pthread_t threads[4];
   pthread_attr_t attr;

   pthread_mutex_init(&count_mutex, NULL);
   pthread_cond_init (&count_threshold_cv, NULL);
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
   pthread_create(&threads[0], &attr, watch_count, (void *)t1);
   pthread_create(&threads[1], &attr, inc_count, (void *)t2);
   pthread_create(&threads[2], &attr, inc_count, (void *)t3);
   pthread_create(&threads[3], &attr, inc_count, (void *)t4);

   for (i=0; i<NUM_THREADS; i++) {
     pthread_join(threads[i], NULL);
   }
   printf ("Main(): Waited on %d  threads. Done.\n", NUM_THREADS);

   pthread_attr_destroy(&attr);
   pthread_mutex_destroy(&count_mutex);
   pthread_cond_destroy(&count_threshold_cv);
   pthread_exit(NULL);
 }

So this code creates 4 threads. Thread 1 keeps track of the count value while the other 3 increment the count value. The run time is the number of times the thread will increment the count value. I have a done value that allows the first thread to increment the count value first until its run time is up.. so its like a First Come First Serve.

My question is: Is there a better way of implementing this? I have read about SCHED_FIFO or SCHED_RR. I guess I dont know how to implement them into this code or if it can be

+1  A: 

SCHED_FIFO and SCHED_RR are real-time scheduling classes. They are not for use in ordinary code. You should be able to do most of anything you need using pthread mutexes.

Zan Lynx
ok.. so then I would have to create an algorithm for RR then, right?
MRP
A: 

If I understand the question correctly you are trying to produce sort of a pipeline where the next thread picks up where the previous one stops. The cleanest solution in this case is to use binary semaphores.

Say you have four threads. Create four semaphores with initial values of {1, 0, 0, 0}. Assign semaphore per thread and have each thread down its semaphore upon start and up the next semaphore (modulo number of threads) in the chain. Start all threads - the first acquires its semaphore immediately, does its work, other threads block on their semaphores. First thread finishes work, ups the next semaphore, thus waking up the next thread, then loops to the beginning, etc. etc.

I think the real-time scheduling classes you mention have nothing to do with the problem at hand.

Some notes on your code:

  • variables not protected by locks that signal state change between threads (like done here) have to be volatile, so compiler does not optimize them out of the loops.
  • you can use thread function argument to pass more complex information to a thread, say a pointer to a structure.
  • you always want to call pthread_cond_wait in a loop where loop condition checks the thing you wait on. This is to avoid spurious wakeups.
  • you always want to call pthread_cond_signal either outside of the lock, or as the very last thing before unlock. This is to avoid wasted wakeup/sleep cycles in the waiting threads - they wake up, find the mutex still locked, block (sleep) again.
  • avoid many threads contending on the same lock, this leads to thundering herd problem.
  • always check return values of the pthread calls.

Hope this helps.

Nikolai N Fetissov
Yeah some of it makes sense. I do think you are right in using semaphores.
MRP
+1 from me for semaphores. -1 for recommending volatile for *anything*. So it's a wash.
Zan Lynx
@Zan: `volatile` in practice is perfectly fine for signaling "one-way"/edge state changes like shutdown flags. It does not of coarse apply to any synchronization/locking/mutual exclusion scenarios.
Nikolai N Fetissov
@Nikolai: No it isn't fine. Nothing in any specification requires volatile changes to become visible to other CPUs in the system. It works on x86 hardware. You can't claim it is portable though.
Zan Lynx
@Zan: That's why I don't like this discussion. I work on a real computers, real OSes, specific compilers. I learned not to trust specifications and test my stuff. BTW, it works on sparc too :)
Nikolai N Fetissov
@Nikolai: Well just remember this when it doesn't work on a machine that requires an explicit locked cache write to get it onto the other CPUs. Then you will know how to fix it. :)
Zan Lynx
@Zan: I believe you, there's ton of fun hardware out there, but then wouldn't normal shared data protected by locks have to be handled in a special way too? What specification requires these changes to propagate anywhere? Should I assume full cache flush with every lock operation?
Nikolai N Fetissov
I made a new post.. I cleaned up the code. I tried using semaphores to do a RR scheduling, but I need to create a dispatcher.. Im not sure of how to do this.
MRP