I've got the following two programs, one acting as a reader and the other as a writer. The writer seems to only send about 3/4 of the data correctly to be read by the reader. Is there any way to guarantee that all the data is being sent? I think I've got it set up so that it reads and writes reliably, but it still seems to miss 1/4 of the data.
Heres the source of the writer
#define pipe "/tmp/testPipe"
using namespace std;
queue<string> sproutFeed;
ssize_t r_write(int fd, char *buf, size_t size) {
char *bufp;
size_t bytestowrite;
ssize_t byteswritten;
size_t totalbytes;
for (bufp = buf, bytestowrite = size, totalbytes = 0;
bytestowrite > 0;
bufp += byteswritten, bytestowrite -= byteswritten) {
byteswritten = write(fd, bufp, bytestowrite);
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
if ((byteswritten) == -1 && (errno != EINTR))
return -1;
if (byteswritten == -1)
byteswritten = 0;
totalbytes += byteswritten;
}
return totalbytes;
}
void* sendData(void *thread_arg)
{
int fd, ret_val, count, numread;
string word;
char bufpipe[5];
ret_val = mkfifo(pipe, 0777); //make the sprout pipe
if (( ret_val == -1) && (errno != EEXIST))
{
perror("Error creating named pipe");
exit(1);
}
while(1)
{
if(!sproutFeed.empty())
{
string s;
s.clear();
s = sproutFeed.front();
int sizeOfData = s.length();
snprintf(bufpipe, 5, "%04d\0", sizeOfData);
char stringToSend[strlen(bufpipe) + sizeOfData +1];
bzero(stringToSend, sizeof(stringToSend));
strncpy(stringToSend,bufpipe, strlen(bufpipe));
strncat(stringToSend,s.c_str(),strlen(s.c_str()));
strncat(stringToSend, "\0", strlen("\0"));
int fullSize = strlen(stringToSend);
signal(SIGPIPE,SIG_IGN);
fd = open(pipe,O_WRONLY);
int numWrite = r_write(fd, stringToSend, strlen(stringToSend) );
cout << errno << endl;
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
if(numWrite != fullSize )
{
signal(SIGPIPE,SIG_IGN);
bzero(bufpipe, strlen(bufpipe));
bzero(stringToSend, strlen(stringToSend));
close(fd);
}
else
{
signal(SIGPIPE,SIG_IGN);
sproutFeed.pop();
close(fd);
bzero(bufpipe, strlen(bufpipe));
bzero(stringToSend, strlen(stringToSend));
}
}
else
{
if(usleep(.0002) == -1)
{
perror("sleeping error\n");
}
}
}
}
int main(int argc, char *argv[])
{
signal(SIGPIPE,SIG_IGN);
int x;
for(x = 0; x < 100; x++)
{
sproutFeed.push("All ships in the sea sink except for that blue one over there, that one never sinks. Most likley because it\'s blue and thats the mightiest colour of ship. Interesting huh?");
}
int rc, i , status;
pthread_t threads[1];
printf("Starting Threads...\n");
pthread_create(&threads[0], NULL, sendData, NULL);
rc = pthread_join(threads[0], (void **) &status);
}
Heres the source of the reader
#define pipe "/tmp/testPipe"
char dataString[50000];
using namespace std;
char *getSproutItem();
void* readItem(void *thread_arg)
{
while(1)
{
x++;
char *s = getSproutItem();
if(s != NULL)
{
cout << "READ IN: " << s << endl;
}
}
}
ssize_t r_read(int fd, char *buf, size_t size) {
ssize_t retval;
while (retval = read(fd, buf, size), retval == -1 && errno == EINTR) ;
return retval;
}
char * getSproutItem()
{
cout << "Getting item" << endl;
char stringSize[4];
bzero(stringSize, sizeof(stringSize));
int fd = open(pipe,O_RDONLY);
cout << "Reading" << endl;
int numread = r_read(fd,stringSize, sizeof(stringSize));
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
cout << "Read Complete" << endl;
if(numread > 1)
{
stringSize[numread] = '\0';
int length = atoi(stringSize);
char recievedString[length];
bzero(recievedString, sizeof(recievedString));
int numread1 = r_read(fd, recievedString, sizeof(recievedString));
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
if(numread1 > 1)
{
recievedString[numread1] = '\0';
cout << "DATA RECIEVED: " << recievedString << endl;
bzero(dataString, sizeof(dataString));
strncpy(dataString, recievedString, strlen(recievedString));
strncat(dataString, "\0", strlen("\0"));
close(fd);
return dataString;
}
else
{
return NULL;
}
}
else
{
return NULL;
}
close(fd);
}
int main(int argc, char *argv[])
{
int rc, i , status;
pthread_t threads[1];
printf("Starting Threads...\n");
pthread_create(&threads[0], NULL, readItem, NULL);
rc = pthread_join(threads[0], (void **) &status);
}