views:

562

answers:

2

Hey everyone. I've been working on some code for a while that would spawn threads when needed, but decided a simpler and more effective solution would be to create a thread pool. It's implemented with a queue I made that has conditional waits on queuing and dequeuing. The only reason I'm posting this, is because I'm getting some weird errors randomly throughout my code that never happened before the switch to the threadpool that go away when I added some debug-print statements. If my code starts working because of print statements, that sounds like some issue with memory and the stack being caused possibly by some bad threading code.

I figured the first place to look would be in the threadpool for correctness and thread-safety. Here are the 3 main functions. Threadstart being the function each thread sits in waiting on the dequeue, and the init function that spawns the threads. The last one is what queues up a work item. The q_enq function is what will signal the conditional variables that wake up the certain threads to then dequeue.

void *
threadstart(void *arg)
{
    threadpool_t * tp = (threadpool_t*)arg;

    while (1)
    {
     workitem_t *work = q_dq(tp->workqueue);

     if (work == NULL)
      break;

     (*work->action)(work->arg);
     free(work);
    }

    pthread_exit(NULL);
};

threadpool_t *
threadpool_init(int max_threads, int max_workload)
{
    threadpool_t *tp;
    pthread_attr_t attr;
    register int i=0;
    int rc =0;
    ASSERT(max_threads > 0 && max_workload > 0);

    tp = malloc(sizeof(threadpool_t));
    tp->max_threads = max_threads;
    tp->threads = calloc(max_threads, sizeof(pthread_t));
    tp->workqueue = q_init(max_workload);

    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_attr_setschedpolicy(&attr, SCHED_RR);

    for (; i < max_threads; i++)
    {
     rc = pthread_create(&tp->threads[i], &attr, threadstart, tp);

     /* worry about errors creating threads later :( */
     if (rc) printf("Error creating threadpool thread %d [%d]\r\n", i, rc);
    }
    pthread_attr_destroy(&attr);

    return tp;
}
void
threadpool_q_workitem(threadpool_t *tp, action_f action, void *arg)
{
    workitem_t *item;
    ASSERT(tp != NULL);

    item = malloc(sizeof(workitem_t));
    item->action = action;
    item->arg = arg;
    q_enq(tp->workqueue, (void*)item);
};

EDIT: Queue Functions

void q_enq(struct queue *q, void *data) {
    struct timeval now;
    struct timespec timeout;

    pthread_mutex_lock(q->mut);

    while (q->full) {
     gettimeofday(&now, (struct timezone *)0);
     timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
     timeout.tv_nsec = now.tv_usec * 1000;

     pthread_cond_timedwait(q->notfull, q->mut, &timeout);

    }
    q->buffer[q->tail++] = data;
    if (q->tail == q->num) {
     q->tail = 0;
    }
    if (q->head == q->tail) {
     q->full = 1;
    }
    q->empty = 0;

    pthread_mutex_unlock(q->mut);
    pthread_cond_signal(q->notempty);
}

void *q_dq(struct queue *q) {
    void *data;
    int rc;
    struct timeval now;
    struct timespec timeout;

    pthread_mutex_lock(q->mut);

    while (q->empty) {
     gettimeofday(&now, NULL);
     timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
     timeout.tv_nsec = now.tv_usec * 1000;
     if (q->finished) {
      pthread_mutex_unlock(q->mut);
      return NULL;
     }

     rc = pthread_cond_timedwait(q->notempty, q->mut, &timeout);
     if (q->finished) {
      pthread_mutex_unlock(q->mut);
      return NULL;
     }
    }
    data = q->buffer[q->head++];
    if (q->head == q->num) {
     q->head = 0;
    }
    if (q->head == q->tail) {
     q->empty = 1;
    }
    q->full = 0;
    pthread_mutex_unlock(q->mut);
    pthread_cond_signal(q->notfull);

    return data;
}
+6  A: 

I think you should do something like this:

void q_enq(struct queue *q, void *data) {
 int next;

 // wait until there's room
 do{
  pthread_mutex_lock( q->mut );

  next = q->tail + 1;
  if( next == q->num) {
   next = 0;
  }

  // still room to add
  if( q->head != next )
   break;

  pthread_mutex_unlock(q->mut);
  sched_yield();
 } while( 1 );

 q->tail = next;
 q->buffer[ q->tail ] = data;

 // signal consumer and unlock mutex
 pthread_cond_signal(q->notempty);
 pthread_mutex_unlock(q->mut);
}

void *q_dq(struct queue *q) {
 void *data;
 int rc;

 pthread_mutex_lock(q->mut);

 // while empty wait
 while( q->tail == q->head ){
  pthread_cond_wait(q->notempty, q->mut );
 }

 // get next and wrap
 data = q->buffer[q->head++];
 if (q->head == q->num) {
  q->head = 0;
 }

 pthread_mutex_unlock(q->mut);
 return data;
}
sfossen
I'll try that out later today. I'm running some stuff and dont have time for a rebuild...
Nicholas Mancuso
did you get a chance to try it?
sfossen
not yet. we have some deadlines on some other stuff. I'll definitely let you know though.
Nicholas Mancuso
i just realized i dont have the pthread_yield() subroutine!
Nicholas Mancuso
I guess i could get away with sleep(0), but that feels kinda like a hack to me. Why couldn't I just use another conditional wait?
Nicholas Mancuso
what OS are you using?
sfossen
if it's macOSX, try sched_yield() from <sched.h>.
sfossen
i'm using windows server 08
Nicholas Mancuso
For windows it's the same (sched_yield)[http://sourceware.org/pthreads-win32/manual/sched_yield.html).
sfossen
you could also use [pthread_delay_np](http://sourceware.org/pthreads-win32/manual/pthread_delay_np.html)
sfossen
+1  A: 

you're calling pthread_attr_destroy too early; don't call pthread_attr_destroy until its time to destroy the pool (i.e,. keep attr around for the lifetime of the thread(s))

Randy Rizun
really? most examples I've read show it being destroyed early. After its value has been passed along...
Nicholas Mancuso