views:

754

answers:

3

I am attempting a multiple producer/consumer problem in C, but its not working as expected. The following is some pseudo code to represent my implementation.

Thread thread1;
Thread thread2;
Thread thread3;

Data data1;
Mutex data1_mutex;
Semaphore data1_empty;
Semaphore data1_fill;

Data data2;
Mutex data2_mutex;
Semaphore data2_empty;
Semaphore data2_fill;

thread1()
{
   // creates data and places it into data1.

   wait(data1_empty);
   lock(data1_mutex);

   // critical section

   unlock(data1_mutex);
   post(data1_fill);
}

thread2()
{
   // Removes data from data1, processes it, and places it into data2.

   // data1
   wait(data1_fill);
   lock(data1_mutex);

   // data2
   wait(data2_empty);
   lock(data2_mutex);

   // critical section

   // data2
   unlock(data2_mutex);
   post(data2_fill);

   // data1
   unlock(data1_mutex);
   post(data1_empty);
}

thread3()
{
   // Removes data from data2, prints its results, and removes it.

   wait(data2_fill);
   lock(data2_mutex);

   // critical section

   unlock(data2_mutex);
   post(data2_empty);
}

However, with this solution data1 will fill up, but thread2 will lock and never run. Is there something wrong with my implementation?

EDIT #1

One of the problems I found was that my second mutex was not being created correctly. I don't know what's wrong with it so I'm just using the first mutex for all threads. There is also something else I have done to get it working, so I'll update my pseudo-code to reflect this later when I have a minute.

+1  A: 

Make sure you post data1_empty and data2_empty initially.

wrang-wrang
Do you mean before the threads are spun?
Nippysaurus
I don't think it matters. My point is: the code you've shown does look like it's correct, provided the conditional variables are initially posted. You can also use a debugger with threads and see where each one is stuck.
wrang-wrang
I dont have an ide or debugger at the moment. I have tried posting to both semaphores before spinning the threads, but it results in data1 setting more values than it has space for, so I don't think thats a solution?
Nippysaurus
A: 

What wrang-wrang said, and try not to hold the lock for data_1 while you're waiting for data_2_empty. You could achieve this by keeping an alternate buffer for data_1 and data_2 that you swap out. Thread_2 swaps data_1 while processing it into data_2, thread_3 swaps out data_2 while processing it. Your current pseudo-code will allow thread 1 and thread 3 to run concurrently, but it won't allow thread 2 to execute at the same time as either of the others.

Mark Pauley
+1  A: 

If you use some sort of queuing type for Data, you should be able to remove the "empty" semaphores completely, unless you're trying to impose the condition that each Data queue depth is strictly 0 or 1. If you use a local variable in thread2, you can reduce the size of the critical section.

The code then becomes something like this:

thread1() {
    //Wait for data to put in the queue (i.e. a blocking socket read)
    lock(data1_mutex);
    data1.push(someData);
    unlock(data1_mutex);
    post(data1_fill);
}

thread2() {
    DataType dataElement;
    wait(data1_fill);
    lock(data1_mutex);
    dataElement = data1.pop();
    unlock(data1_mutex);

    lock(data2_mutex);
    data2.push(dataElement);
    unlock(data2_mutex);
    post(data2_fill);
}

thread3() {
    DataType dataElement;
    wait(data2_fill);
    lock(data2_mutex);
    dataElement = data2.pop();
    //do something with dataElement here
    unlock(data2_mutex);
}
Adam
If .pop() makes a copy, then thread3 can unlock(data2_mutex) before using dataElement. If it's by ref, then thread2 can't unlock data1_mutex until it has copied dataElement into data2. (push and pop can't both be by ref, I think.)
Peter Cordes