Hey everyone. I've been working on some code for a while that would spawn threads when needed, but decided a simpler and more effective solution would be to create a thread pool. It's implemented with a queue I made that has conditional waits on queuing and dequeuing. The only reason I'm posting this, is because I'm getting some weird errors randomly throughout my code that never happened before the switch to the threadpool that go away when I added some debug-print statements. If my code starts working because of print statements, that sounds like some issue with memory and the stack being caused possibly by some bad threading code.
I figured the first place to look would be in the threadpool for correctness and thread-safety. Here are the 3 main functions. Threadstart being the function each thread sits in waiting on the dequeue, and the init function that spawns the threads. The last one is what queues up a work item. The q_enq function is what will signal the conditional variables that wake up the certain threads to then dequeue.
void *
threadstart(void *arg)
{
threadpool_t * tp = (threadpool_t*)arg;
while (1)
{
workitem_t *work = q_dq(tp->workqueue);
if (work == NULL)
break;
(*work->action)(work->arg);
free(work);
}
pthread_exit(NULL);
};
threadpool_t *
threadpool_init(int max_threads, int max_workload)
{
threadpool_t *tp;
pthread_attr_t attr;
register int i=0;
int rc =0;
ASSERT(max_threads > 0 && max_workload > 0);
tp = malloc(sizeof(threadpool_t));
tp->max_threads = max_threads;
tp->threads = calloc(max_threads, sizeof(pthread_t));
tp->workqueue = q_init(max_workload);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_attr_setschedpolicy(&attr, SCHED_RR);
for (; i < max_threads; i++)
{
rc = pthread_create(&tp->threads[i], &attr, threadstart, tp);
/* worry about errors creating threads later :( */
if (rc) printf("Error creating threadpool thread %d [%d]\r\n", i, rc);
}
pthread_attr_destroy(&attr);
return tp;
}
void
threadpool_q_workitem(threadpool_t *tp, action_f action, void *arg)
{
workitem_t *item;
ASSERT(tp != NULL);
item = malloc(sizeof(workitem_t));
item->action = action;
item->arg = arg;
q_enq(tp->workqueue, (void*)item);
};
EDIT: Queue Functions
void q_enq(struct queue *q, void *data) {
struct timeval now;
struct timespec timeout;
pthread_mutex_lock(q->mut);
while (q->full) {
gettimeofday(&now, (struct timezone *)0);
timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
timeout.tv_nsec = now.tv_usec * 1000;
pthread_cond_timedwait(q->notfull, q->mut, &timeout);
}
q->buffer[q->tail++] = data;
if (q->tail == q->num) {
q->tail = 0;
}
if (q->head == q->tail) {
q->full = 1;
}
q->empty = 0;
pthread_mutex_unlock(q->mut);
pthread_cond_signal(q->notempty);
}
void *q_dq(struct queue *q) {
void *data;
int rc;
struct timeval now;
struct timespec timeout;
pthread_mutex_lock(q->mut);
while (q->empty) {
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + Q_TIMEOUT;
timeout.tv_nsec = now.tv_usec * 1000;
if (q->finished) {
pthread_mutex_unlock(q->mut);
return NULL;
}
rc = pthread_cond_timedwait(q->notempty, q->mut, &timeout);
if (q->finished) {
pthread_mutex_unlock(q->mut);
return NULL;
}
}
data = q->buffer[q->head++];
if (q->head == q->num) {
q->head = 0;
}
if (q->head == q->tail) {
q->empty = 1;
}
q->full = 0;
pthread_mutex_unlock(q->mut);
pthread_cond_signal(q->notfull);
return data;
}