views:

62

answers:

2

I'm trying to write a simple thread pool program in pthread. However, it seems that pthread_cond_signal doesn't block, which creates a problem. For example, let's say I have a "producer-consumer" program:

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;

void * liberator(void * arg)
{
    // XXX make sure he is ready to be freed
    sleep(1);

    pthread_mutex_lock(&my_cond_m);
    pthread_cond_signal(&my_cond);
    pthread_mutex_unlock(&my_cond_m);

    return NULL;
}

int main()
{
    pthread_t t1;
    pthread_create(&t1, NULL, liberator, NULL);

    // XXX Don't take too long to get ready. Otherwise I'll miss 
    // the wake up call forever
    //sleep(3);

    pthread_mutex_lock(&my_cond_m);
    pthread_cond_wait(&my_cond, &my_cond_m);
    pthread_mutex_unlock(&my_cond_m);

    pthread_join(t1, NULL);

    return 0;
}

As described in the two XXX marks, if I take away the sleep calls, then main() may stall because it has missed the wake up call from liberator(). Of course, sleep isn't a very robust way to ensure that either.

In real life situation, this would be a worker thread telling the manager thread that it is ready for work, or the manager thread announcing that new work is available.

How would you do this reliably in pthread?


Elaboration

@Borealid's answer kind of works, but his explanation of the problem could be better. I suggest anyone looking at this question to read the discussion in the comments to understand what's going on.

In particular, I myself would amend his answer and code example like this, to make this clearer. (Since Borealid's original answer, while compiled and worked, confused me a lot)

// In main
pthread_mutex_lock(&my_cond_m);

// If the flag is not set, it means liberator has not 
// been run yet. I'll wait for him through pthread's signaling 
// mechanism

// If it _is_ set, it means liberator has been run. I'll simply 
// skip waiting since I've already synchronized. I don't need to 
// use pthread's signaling mechanism
if(!flag) pthread_cond_wait(&my_cond, &my_cond_m);

pthread_mutex_unlock(&my_cond_m);

// In liberator thread
pthread_mutex_lock(&my_cond_m);

// Signal anyone who's sleeping. If no one is sleeping yet, 
// they should check this flag which indicates I have already 
// sent the signal. This is needed because pthread's signals 
// is not like a message queue -- a sent signal is lost if 
// nobody's waiting for a condition when it's sent.
// You can think of this flag as a "persistent" signal
flag = 1;
pthread_cond_signal(&my_cond);
pthread_mutex_unlock(&my_cond_m);
+4  A: 

Use a synchronization variable.

In main:

pthread_mutex_lock(&my_cond_m);
while (!flag) {
    pthread_cond_wait(&my_cond, &my_cond_m);
}
pthread_mutex_unlock(&my_cond_m);

In the thread:

pthread_mutex_lock(&my_cond_m);
flag = 1;
pthread_cond_broadcast(&my_cond);
pthread_mutex_unlock(&my_cond_m);

For a producer-consumer problem, this would be the consumer sleeping when the buffer is empty, and the producer sleeping when it is full. Remember to acquire the lock before accessing the global variable.

Borealid
Does `pthread_cond_broadcast` block? Since the problem I had was pertaining to the worker thread (`liberator`) *not blocking*, so `main` would miss the `pthread_cond_signal` (since it is not a message queue -- it's just one shot to catch anyone waiting).I don't really understand why adding the flag helps with the problem -- afterall, `liberator` still fires one shot, and if `main` hasn't even reached the new `while` loop by that time (thus not waiting yet), it will miss the wake up call forever (since the thread won't fire it again).What am I missing?
kizzx2
I added my own answer. While it also involves an extra variable and a while loop, those aren't the heart of the problem. IMO, this answer doesn't explain the key point the worker thread does not sleep (which it should) in the original program. This answer's code is also misleading because adding a `while` loop to the wait condition is clearly not helping the problem -- since the origianl problem was `main` stuck in a sleep!
kizzx2
@kizzx2: I think you're not really understanding. Your actual problem is that your code does not proceed in the event that `liberator` runs before `main` reaches its `pthread_cond_wait` call. The solution is for `main` to wait **only** in the event that some condition (which liberator makes true) is not yet true. You don't want to wait for the sake of waiting; you must be waiting for something. The purpose of the `while` loop (instead of an `if`) is just to go back to sleep if you're woken up extraneously.
Borealid
@Borealid: In this particular case, there is only one thread `liberator` which makes exactly one `pthread_cond_signal` -- there is nobody else to make the "extraneous" wake up call. `main` should be more concerned that it _won't_ wake up other than being waken up extraneously. As you said, the problem is that `liberator` sends the signal **before** `main` is waiting. How can adding a `while` loop and a flag solve the problem? If `liberator` sends the signal before `main` even has the chance to check the flag or enter the while loop, the program halts all the same.....
kizzx2
@Borealid: The solution, therefore, from first principles, is that `liberator` is responsible to **make sure** that the `pthread_cond_signal` *will* reach someone waiting, instead of lost in thin air. As you put it (in words, not in code), when `liberator` is about the send the signal, it must check whether `main` is sleeping or not. If `main` is sleeping, `liberator` can proceed to send the signal. If `main` is **not** yet waiting, `liberator` must wait for `main` to `pthread_cond_wait` first. This "waiting" action can be achieved by several means, such as traditional `while` loop (polling)..
kizzx2
@kizzx2: No, you're still not seeing it. If `liberator` sends the signal first, it has **also** set the flag. So `main` won't enter the loop at all. You're obviously not going to be satisfied with my explanations - just try running the code I gave with your original example. Start `flag` at zero. It will never, ever, ever deadlock.
Borealid
@Borealid: .. however, polling is not efficient and wastes precious CPU cycles, that's why `pthread_cond_wait` was invented in the first place. Therefore, the solution to this problem seems to be that **both** `liberator` and `main` must know how to "sleep" and "signal the other person" in **both** code paths (a. `liberator` reaches the "checkpoint" first; b. `main` reaches the "checkpoint" first)
kizzx2
@kizzx2: There is still no polling in the answer I gave. `main` checks the flag at most twice, except in that event it is woken up spuriously. If you're so convinced you know best, why are you asking a question?
Borealid
@Borealid: Oh I see. That's correct :) I suggest as an answer for this question: Your last comment be added to the main body -- that is the "real" answer (the one that starts with "you're still not seeing it"). I also don't think the change from `pthread_cond_signal` to `pthread_cond_broadcast` is necessary in this particular case. Also an `if` check is enough because there is only one person to make the wake up call (makes the solution clearer about what it's solving)
kizzx2
@Borealid: I didn't mean to say I know best, I'm discussing with my understanding (which can be wrong). That's the reason I asked the question.
kizzx2
+1  A: 

I found out the solution here. For me, the tricky bit to understand the problem is that:

  1. Producers and consumers must be able to communicate both ways. Either way is not enough.
  2. This two-way communication can be packed into one pthread condition.

To illustrate, the blog post mentioned above demonstrated that this is actually meaningful and desirable behavior:

pthread_mutex_lock(&cond_mutex);
pthread_cond_broadcast(&cond):
pthread_cond_wait(&cond, &cond_mutex);
pthread_mutex_unlock(&cond_mutex);

The idea is that if both the producers and consumers employ this logic, it will be safe for either of them to be sleeping first, since the each will be able to wake the other role up. Put it in another way, in a typical producer-consumer sceanrio -- if a consumer needs to sleep, it's because a producer needs to wake up, and vice versa. Packing this logic in a single pthread condition makes sense.

Of course, the above code has the unintended behavior that a worker thread will also wake up another sleeping worker thread when it actually just wants to wake the producer. This can be solved by a simple variable check as @Borealid suggested:

while(!work_available) pthread_cond_wait(&cond, &cond_mutex);

Upon a worker broadcast, all worker threads will be awaken, but one-by-one (because of the implicit mutex locking in pthread_cond_wait). Since one of the worker threads will consume the work (setting work_available back to false), when other worker threads awake and actually get to work, the work will be unavailable so the worker will sleep again.

Here's some commented code I tested, for anyone interested:

// gcc -Wall -pthread threads.c -lpthread

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;

int * next_work = NULL;
int all_work_done = 0;

void * worker(void * arg)
{
    int * my_work = NULL;

    while(!all_work_done)
    {
        pthread_mutex_lock(&my_cond_m);

        if(next_work == NULL)
        {
            // Signal producer to give work
            pthread_cond_broadcast(&my_cond);

            // Wait for work to arrive
            // It is wrapped in a while loop because the condition 
            // might be triggered by another worker thread intended 
            // to wake up the producer
            while(!next_work && !all_work_done)
                pthread_cond_wait(&my_cond, &my_cond_m);
        }

        // Work has arrived, cache it locally so producer can 
        // put in next work ASAP
        my_work = next_work;
        next_work = NULL;
        pthread_mutex_unlock(&my_cond_m);

        if(my_work)
        {
            printf("Worker %d consuming work: %d\n", (int)(pthread_self() % 100), *my_work);
            free(my_work);
        }
    }

    return NULL;
}

int * create_work()
{
    int * ret = (int *)malloc(sizeof(int));
    assert(ret);
    *ret = rand() % 100;
    return ret;
}

void * producer(void * arg)
{
    int i;

    for(i = 0; i < 10; i++)
    {
        pthread_mutex_lock(&my_cond_m);
        while(next_work != NULL)
        {
            // There's still work, signal a worker to pick it up
            pthread_cond_broadcast(&my_cond);

            // Wait for work to be picked up
            pthread_cond_wait(&my_cond, &my_cond_m);
        }

        // No work is available now, let's put work on the queue
        next_work = create_work();
        printf("Producer: Created work %d\n", *next_work);

        pthread_mutex_unlock(&my_cond_m);
    }

    // Some workers might still be waiting, release them
    pthread_cond_broadcast(&my_cond);

    all_work_done = 1;
    return NULL;
}

int main()
{
    pthread_t t1, t2, t3, t4;

    pthread_create(&t1, NULL, worker, NULL);
    pthread_create(&t2, NULL, worker, NULL);
    pthread_create(&t3, NULL, worker, NULL);
    pthread_create(&t4, NULL, worker, NULL);

    producer(NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    return 0;
}
kizzx2