Hi,
I'm an amateur programmer that's experimenting using pthreads, to see to what extent a multi-threaded program can lead to efficiencies in a rather long computation I'm working on. The computation runs through a std::list< string > object, popping the first element of the list off, and farming it out to a thread that computes something with it. The program keeps track of the active threads, and ensures there's always a certain number of active threads running. Once the list is empty, the program sorts the resulting data, dumps a data file and terminates.
The multi-threaded version of the program currently does not work. It gets 20 or 40 or 200 or so elements down the list (depending on which list I give it), and segfaults. It seems that the segfault happens on particular elements of the list, meaning they don't appear random in any way.
BUT the strange thing is, if I compile with debug symbols and run the program through gdb, the program does not segfault. It runs perfectly. Slowly, of course, but it runs and does everything the way I expect it to.
After playing around with everyone's suggestions for a while, using (among other things) valgrind's tools to try and sort out what's happening. I've noticed the simplified code below (without any calls outside the std library or the pthread library) causes trouble for helgrind and this is likely the source of my problems. So here is just the simplified code, and helgrind's complaints.
#include <cstdlib>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <list>
#include <iostream>
#include <signal.h>
#include <sys/select.h>
struct thread_detail {
pthread_t *threadID;
unsigned long num;
};
pthread_mutex_t coutLock;
void *ThreadToSpawn(void *threadarg)
{
struct thread_detail *my_data;
my_data = (struct thread_detail *) threadarg;
int taskid = my_data->num;
struct timeval timeout;
for (unsigned long i=0; i < 10; i++)
{
timeout.tv_sec = 0; timeout.tv_usec = 500000; // half-second
select( 0, NULL, NULL, NULL, & timeout );
pthread_mutex_lock(&coutLock);
std::cout << taskid << " "; std::cout.flush();
pthread_mutex_unlock(&coutLock);
}
pthread_exit(NULL);
}
int main (int argc, char *argv[])
{
unsigned long comp_DONE=0;
unsigned long comp_START=0;
unsigned long ms_LAG=10000; // microsecond lag between polling of threads
// set-up the mutexes
pthread_mutex_init( &coutLock, NULL );
if (argc != 3) { std::cout << "Program requires two arguments: (1) number of threads to use,"
" and (2) tasks to accomplish. \n"; exit(1); }
unsigned long NUM_THREADS(atoi( argv[1] ));
unsigned long comp_TODO(atoi(argv[2]));
std::cout << "Program will have " << NUM_THREADS << " threads. \n";
std::list < thread_detail > thread_table;
while (comp_DONE != comp_TODO) // main loop to set-up and track threads
{
// poll stack of computations to see if any have finished,
// extract data and remove completed ones from stack
std::list < thread_detail >::iterator i(thread_table.begin());
while (i!=thread_table.end())
{
if (pthread_kill(*i->threadID,0)!=0) // thread is dead
{ // if there was relevant info in *i we'd extract it here
if (pthread_join(*i->threadID, NULL)!=0) { std::cout << "Thread join error!\n"; exit(1); }
pthread_mutex_lock(&coutLock);
std::cout << i->num << " done. "; std::cout.flush();
pthread_mutex_unlock(&coutLock);
delete i->threadID;
thread_table.erase(i++);
comp_DONE++;
}
else (i++);
}
// if list not full, toss another on the pile
while ( (thread_table.size() < NUM_THREADS) && (comp_TODO > comp_START) )
{
pthread_t *tId( new pthread_t );
thread_detail Y; Y.threadID=tId; Y.num=comp_START;
thread_table.push_back(Y);
int rc( pthread_create( tId, NULL, ThreadToSpawn, (void *)(&(thread_table.back() )) ) );
if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); }
pthread_mutex_lock(&coutLock);
std::cout << comp_START << " start. "; std::cout.flush();
pthread_mutex_unlock(&coutLock);
comp_START++;
}
// wait a specified amount of time
struct timeval timeout;
timeout.tv_sec = 0; timeout.tv_usec = ms_LAG;
select( 0, NULL, NULL, NULL, & timeout );
} // the big while loop
pthread_exit(NULL);
}
Helgrind output
==2849== Helgrind, a thread error detector
==2849== Copyright (C) 2007-2009, and GNU GPL'd, by OpenWorks LLP et al.
==2849== Using Valgrind-3.6.0.SVN-Debian and LibVEX; rerun with -h for copyright info
==2849== Command: ./thread2 2 6
==2849==
Program will have 2 threads.
==2849== Thread #2 was created
==2849== at 0x64276BE: clone (clone.S:77)
==2849== by 0x555E172: pthread_create@@GLIBC_2.2.5 (createthread.c:75)
==2849== by 0x4C2D42C: pthread_create_WRK (hg_intercepts.c:230)
==2849== by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849== by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849==
==2849== Thread #1 is the program's root thread
==2849==
==2849== Possible data race during write of size 8 at 0x7feffffe0 by thread #2
==2849== at 0x4C2D54C: mythread_wrapper (hg_intercepts.c:200)
==2849== This conflicts with a previous read of size 8 by thread #1
==2849== at 0x4C2D440: pthread_create_WRK (hg_intercepts.c:235)
==2849== by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849== by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849==
[0 start.] [1 start.] 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 [0 done.] [1 done.] [2 start.] [3 start.] 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 [2 done.] [3 done.] [4 start.] [5 start.] 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 [4 done.] [5 done.] ==2849==
==2849== For counts of detected and suppressed errors, rerun with: -v
==2849== Use --history-level=approx or =none to gain increased speed, at
==2849== the cost of reduced accuracy of conflicting-access information
==2849== ERROR SUMMARY: 6 errors from 1 contexts (suppressed: 675 from 37)
Presumably I'm using pthreads in an incorrect way but it's not so clear to me what I'm doing wrong. Moreover, I'm not sure what to make the of the helgrind output. Earlier helgrind was complaining because I had not called pthread_join on threads that for other reasons the code knew was dead. Adding the pthread_join took care of those complaints.
Reading various pthread tutorials on-line I've discovered that it's probably pointless to have so much thread creation and destruction going on, as in the above code. It's probably more efficient to have N threads running simultaneously, and use mutexes and shared memory to pass data between the "BOSS" thread and the "WORKER" threads, only killing the WORKER threads once, at the end of the program. So that's something I'll eventually have to adjust for but is there anything obviously wrong with the above code?
Edit: I'm noticing some keywords more and more often. The terminology for the thing I'm trying to create is apparently a thread pool. Moreover, there are various proposals for standard implementations of this, for example in the boost library there's boost::threadpool, boost::task, boost::thread. Some of these appear to be only proposals. I come across threads here where people mention you can combine ASIO and boost::thread to accomplish what I'm looking for. Similarly there's a message queue class.
Hmm, so it seems like I'm scratching at the surface of a topics many people are thinking about nowadays, but it seems kind of germinal, like OOP was in 1989 or something.