views:

471

answers:

5

(In short: main()'s WaitForSingleObject hangs in the program below).

I'm trying to write a piece of code that dispatches threads and waits for them to finish before it resumes. Instead of creating the threads every time, which is costly, I put them to sleep. The main thread creates X threads in CREATE_SUSPENDED state.

The synch is done with a semaphore with X as MaximumCount. The semaphore's counter is put down to zero and the threads are dispatched. The threds perform some silly loop and call ReleaseSemaphore before they go to sleep. Then the main thread uses WaitForSingleObject X times to be sure every thread finished its job and is sleeping. Then it loops and does it all again.

From time to time the program does not exit. When I beak the program I can see that WaitForSingleObject hangs. This means that a thread's ReleaseSemaphore did not work. Nothing is printf'ed so supposedly nothing went wrong.

Maybe two threads shouldn't call ReleaseSemaphore at the exact same time, but that would nullify the purpose of semaphores...

I just don't grok it...

Other solutions to synch threads are gratefully accepted!

#define TRY  100
#define LOOP 100

HANDLE *ids;
HANDLE semaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{ 
 float x = 1.0f;   
 while(1)
 { 
  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
   printf(" ReleaseSemaphore error : %d ", GetLastError());
  SuspendThread(ids[(int) lpParameter]);
 }
 return (DWORD)(int)x;
}

int main()
{
 SYSTEM_INFO sysinfo;
 GetSystemInfo( &sysinfo );
 int numCPU = sysinfo.dwNumberOfProcessors;

 semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL);
 ids = new HANDLE[numCPU];

 for (int j=0 ; j<numCPU ; j++)
  ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL);

 for (int j=0 ; j<TRY ; j++)
 {
  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }
  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);
 }
 CloseHandle(semaphore);
 printf("Done\n");
 getc(stdin);
}
+1  A: 

I don't understand the code, but the threading sync is definitely bad. You assume that threads will call SuspendThread() in a certain order. A succeeded WaitForSingleObject() call doesn't tell you which thread called ReleaseSemaphore(). You'll thus call ReleaseThread() on a thread that wasn't suspended. This quickly deadlocks the program.

Another bad assumption is that a thread already called SuspendThread after the WFSO returned. Usually yes, not always. The thread could be pre-empted right after the RS call. You'll again call ReleaseThread() on a thread that wasn't suspended. That one usually takes a day or so to deadlock your program.

And I think there's one ReleaseSemaphore call too many. Trying to unwedge it, no doubt.

You cannot control threading with Suspend/ReleaseThread(), don't try.

Hans Passant
Well I don't assume any order for SuspendThread. The innermost loop actually Resumes the thread in an arbitrary order but then I call WaitForSingleObject numCPU times, this waits for numCPU ReleaseSemaphore(), wich may happen in any order.
Gabriel
@Gabriel: I don't think you are going to solve this until you see that calling ReleaseThread on a thread that isn't suspended is your main problem.
Hans Passant
I had failed to understand your second paragraph. Now I got it, that's the same idea as stmax'. Thanks for the explanation!
Gabriel
I managed a solution using only semaphores, not suspending/releasing threads. You were right, then. Thanx \o/
Gabriel
A: 

Instead of using a semaphore (at least directly) or having main explicitly wake up a thread to get some work done, I've always used a thread-safe queue. When main wants a worker thread to do something, it pushes a description of the job to be done onto the queue. The worker threads each just do a job, then try to pop another job from the queue, and end up suspended until there's a job in the queue for them to do:

The code for the queue looks like this:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

And a rough equivalent of your code in the threads to use it looks something like this. I didn't sort out exactly what your thread function was doing, but it was something with summing square roots, and apparently you're more interested in the thread synch than what the threads actually do, for the moment.

Edit: (based on comment): If you need main() to wait for some tasks to finish, do some more work, then assign more tasks, it's generally best to handle that by putting an event (for example) into each task, and have your thread function set the events. Revised code to do that would look like this (note that the queue code isn't affected):

#include "queue.hpp"

#include <iostream>
#include <process.h>
#include <math.h>
#include <vector>

struct task { 
    int val;
    HANDLE e;

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { }
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {}
};

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p);

    task t;
    while ( -1 != (t=q.pop()).val) {
        std::cout << t.val << "\n";
        SetEvent(t.e);
    }
}

int main() { 
    queue<task> jobs;

    enum { thread_count = 4 };
    enum { task_count = 10 };

    std::vector<HANDLE> threads;
    std::vector<HANDLE> events;

    std::cout << "Creating thread pool" << std::endl;
    for (int t=0; t<thread_count; ++t)
        threads.push_back((HANDLE)_beginthread(process, 0, &jobs));
    std::cout << "Thread pool Waiting" << std::endl;

    std::cout << "First round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+1);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE);

    events.clear();

    std::cout << "Second round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+20);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE);
    events.clear();

    for (int j=0; j<thread_count; ++j)
        jobs.push(-1);

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE);

    return 0;
}
Jerry Coffin
As far as I understand your code, the synch happens on the last WaitForMultipleObjects, when you wait for all threads to die. I cannot afford letting them die. The code is in an infinite loop which will need the threads to sleep some time and then munch more jobs. I need to be sure all the jobs have been done by the threads before the main proceeds and does other suff before it gets back to those threads and gives them more jobs. I cant recreate new threads, that's too slow, even with _beginthreads().
Gabriel
+1  A: 

the problem happens in the following case:

the main thread resumes the worker threads:

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

the worker threads do their work and release the semaphore:

  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)

the main thread waits for all worker threads and resets the semaphore:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);

the main thread goes into the next round, trying to resume the worker threads (note that the worker threads haven't event suspended themselves yet! this is where the problem starts... you are trying to resume threads that aren't necessarily suspended yet):

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

finally the worker threads suspend themselves (although they should already start the next round):

  SuspendThread(ids[(int) lpParameter]);

and the main thread waits forever since all workers are suspended now:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);

here's a link that shows how to correctly solve producer/consumer problems:

http://en.wikipedia.org/wiki/Producer-consumer_problem

also i think critical sections are much faster than semaphores and mutexes. they're also easier to understand in most cases (imo).

stmax
Nice theory/explanation. Putting the main to Sleep(1) before resuming the threads seems to solve the problem, but then performance is gone. At least this confirms the theory: the main resume threads that are not sleeping yet \o/
Gabriel
A: 

The problem is that you are waiting more often than you are signaling.

The for (int j=0 ; j<TRY ; j++) loop waits eight times for the semaphore, while the four threads will only signal once each and the loop itself signals it once. The first time through the loop, this is not an issue of because the semaphore is given an initial count of four. The second and each subsequent time, you are waiting for too many signals. This is mitigated by the fact that on the first four waits you limit the time and don't retry on error. So sometimes it may work and sometimes your wait will hang.

I think the following (untested) changes will help.

Initialize the semaphore to zero count:

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

Get rid of the wait in the thread resumption loop (i.e. remove the following):

   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)  
      printf("Timed out !!!\n");  

Remove the extraneous signal from the end of the try loop (i.e. remove the following):

ReleaseSemaphore(semaphore,numCPU,NULL);
Dingo
A: 

Here is a practical solution.

I wanted my main program to use threads (then using more than one core) to munch jobs and wait for all the threads to complete before resuming and doing other stuff. I did not want to let the threads die and create new ones because that's slow. In my question, I was trying to do that by suspending the threads, which seemed natural. But as nobugz pointed out, "Thou canst control threading with Suspend/ReleaseThread()".

The solution involves semaphores like the one I was using to control the threads. Actually one more semaphore is used to control the main thread. Now I have one semaphore per thread to control the threads and one semaphore to control the main.

Here is the solution:

#include <windows.h>
#include <stdio.h>
#include <math.h>
#include <process.h>

#define TRY  500000
#define LOOP 100

HANDLE *ids;
HANDLE *semaphores;
HANDLE allThreadsSemaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{   
    float x = 1.0f;         
    while(1)
    {   
        WaitForSingleObject(semaphores[(int)lpParameter],INFINITE);
        for (int i=1 ; i<LOOP ; i++)
            x = sqrt((float)i*x+rand());
        ReleaseSemaphore(allThreadsSemaphore,1,NULL);
    }
    return (DWORD)(int)x;
}

int main()
{
    SYSTEM_INFO sysinfo;
    GetSystemInfo( &sysinfo );
    int numCPU = sysinfo.dwNumberOfProcessors;

    ids = new HANDLE[numCPU];
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++)
    {
        ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL);
        // Threads blocked until main releases them one by one
        semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL);
    }
    // Blocks main until threads finish
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

    for (int j=0 ; j<TRY ; j++)
    {
        for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs
            ReleaseSemaphore(semaphores[i],1,NULL);
        for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish
            WaitForSingleObject(allThreadsSemaphore,INFINITE);
    }
    for (int j=0 ; j<numCPU ; j++)
        CloseHandle(semaphores[j]);
    CloseHandle(allThreadsSemaphore);
    printf("Done\n");
    getc(stdin);
}
Gabriel