views:

1186

answers:

9

Hi,

Using POSIX threads & C++, I have an "Insert operation" which can only be done safely one at a time.

If I have multiple threads waiting to insert using pthread_join then spawning a new thread when it finishes. Will they all receive the "thread complete" signal at once and spawn multiple inserts or is it safe to assume that the thread that receives the "thread complete" signal first will spawn a new thread blocking the others from creating new threads.

/* --- GLOBAL --- */
pthread_t insertThread;



/* --- DIFFERENT THREADS --- */
// Wait for Current insert to finish
pthread_join(insertThread, NULL); 

// Done start a new one
pthread_create(&insertThread, NULL, Insert, Data);
+2  A: 

According to the Single Unix Specifcation: "The results of multiple simultaneous calls to pthread_join() specifying the same target thread are undefined."

The "normal way" of achieving a single thread to get the task would be to set up a condition variable (don't forget the related mutex): idle threads wait in pthread_cond_wait() (or pthread_cond_timedwait()), and when the thread doing the work has finished, it wakes up one of the idle ones with pthread_cond_signal().

Timo Metsälä
+3  A: 

From opengroup.org on pthread_join:

The results of multiple simultaneous calls to pthread_join() specifying the same target thread are undefined.

So, you really should not have several threads joining your previous insertThread.

First, as you use C++, I recommend boost.thread. They resemble the POSIX model of threads, and also work on Windows. And it helps you with C++, i.e. by making function-objects usable more easily.

Second, why do you want to start a new thread for inserting an element, when you always have to wait for the previous one to finish before you start the next one? Seems not to be classical use of multiple-threads.

Although... One classical solution to this would be to have one worker-thread getting jobs from an event-queue, and other threads posting the operation onto the event-queue.

If you really just want to keep it more or less the way you have it now, you'd have to do this:

  • Create a condition variable, like insert_finished.
  • All the threads which want to do an insert, wait on the condition variable.
  • As soon as one thread is done with its insertion, it fires the condition variable.
  • As the condition variable requires a mutex, you can just notify all waiting threads, they all want start inserting, but as only one thread can acquire the mutex at a time, all threads will do the insert sequentially.

But you should take care that your synchronization is not implemented in a too ad-hoc way. As this is called insert, I suspect you want to manipulate a data-structure, so you probably want to implement a thread-safe data-structure first, instead of sharing the synchronization between data-structure-accesses and all clients. I also suspect that there will be more operations then just insert, which will need proper synchronization...

gimpf
A: 

The others have already pointed out this has undefined behaviour. I'd just add that the really simplest way to accomplish your task (to allow only one thread executing part of code) is to use a simple mutex - you need the threads executing that code to be MUTally EXclusive, and that's where mutex came to its name :-)

If you need the code to be ran in a specific thread (like Java AWT), then you need conditional variables. However, you should think twice whether this solution actually pays off. Imagine, how many context switches you need if you call your "Insert operation" 10000 times per second.

jpalecek
A: 

Thank you for the replies

The program is basically a huge hash table which takes requests from clients through Sockets.

Each new client connection spawns a new thread from which it can then perform multiple operations, specifically lookups or inserts. lookups can be conducted in parallel. But inserts need to be "re-combined" into a single thread. You could say that lookup operations could be done without spawning a new thread for the client, however they can take a while causing the server to lock, dropping new requests. The design tries to minimize system calls and thread creation as much as possible.

But now that i know it's not safe the way i first thought I should be able to cobble something together

Thanks

zootreeves
This doesn't directly answer your question, but your program sounds very similar to memcached.
Tom
A: 

As you just now mentioned you're using a hash-table with several look-ups parallel to insertions, I'd recommend to check whether you can use a concurrent hash-table.

As the exact look-up results are non-deterministic when you're inserting elements simultaneously, such a concurrent hash-map may be exactly what you need. I do not have used concurrent hash-tables in C++, though, but as they are available in Java, you'll for sure find a library doing this in C++.

gimpf
A: 

The only library which i found which supports inserts without locking new lookups - Sunrise DD (And i'm not sure whether it supports concurrent inserts)

However the switch from Google's Sparse Hash map more than doubles the memory usage. Lookups should happen fairly infrequently so rather than trying and write my own library which combines the advantages of both i would rather just lock the table suspending lookups while changes are made safely.

Thanks again

zootreeves
A: 

It seems to me that you want to serialise inserts to the hashtable.

For this you want a lock - not spawning new threads.

Douglas Leeder
A: 

From your description that looks very inefficient as you are re-creating the insert thread every time you want to insert something. The cost of creating the thread is not 0.

A more common solution to this problem is to spawn an insert thread that waits on a queue (ie sits in a loop sleeping while the loop is empty). Other threads then add work items to the queue. The insert thread picks items of the queue in the order they were added (or by priority if you want) and does the appropriate action.

All you have to do is make sure addition to the queue is protected so that only one thread at a time has accesses to modifying the actual queue, and that the insert thread does not do a busy wait but rather sleeps when nothing is in the queue (see condition variable).

Martin York
A: 

Yes as most people recommended the best way seems to have a worker thread reading from a queue. Some code snippets below

    pthread_t    insertThread = NULL;
    pthread_mutex_t insertConditionNewMutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t insertConditionDoneMutex    = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t  insertConditionNew   = PTHREAD_COND_INITIALIZER;
    pthread_cond_t  insertConditionDone  = PTHREAD_COND_INITIALIZER;

       //Thread for new incoming connection
        void * newBatchInsert()
        {
           for(each Word)
           {
                //Push It into the queue
                pthread_mutex_lock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);
                 lexicon[newPendingWord->length - 1]->insertQueue.push(newPendingWord);
                pthread_mutex_unlock(&lexicon[newPendingWord->length - 1]->insertQueueMutex);

           }

              //Send signal to worker Thread
              pthread_mutex_lock(&insertConditionNewMutex);
               pthread_cond_signal(&insertConditionNew);
              pthread_mutex_unlock(&insertConditionNewMutex);

              //Wait Until it's finished
              pthread_cond_wait(&insertConditionDone, &insertConditionDoneMutex);

        }


            //Worker thread
            void * insertWorker(void *)
            {

                while(1)  
                {

                 pthread_cond_wait(&insertConditionNew, &insertConditionNewMutex);

                 for (int ii = 0; ii < maxWordLength; ++ii)
                 {     
                   while (!lexicon[ii]->insertQueue.empty())
                   {

                    queueNode * newPendingWord = lexicon[ii]->insertQueue.front();


                    lexicon[ii]->insert(newPendingWord->word);

                    pthread_mutex_lock(&lexicon[ii]->insertQueueMutex);
                    lexicon[ii]->insertQueue.pop();
                    pthread_mutex_unlock(&lexicon[ii]->insertQueueMutex);

                   }

                 }

                 //Send signal that it's done
                 pthread_mutex_lock(&insertConditionDoneMutex);
                  pthread_cond_broadcast(&insertConditionDone);
                 pthread_mutex_unlock(&insertConditionDoneMutex);

                }

            }

            int main(int argc, char * const argv[]) 
            {

                pthread_create(&insertThread, NULL, &insertWorker, NULL);


                lexiconServer = new server(serverPort, (void *) newBatchInsert);

                return 0;
            }
zootreeves