tags:

views:

418

answers:

4

I've got the following two programs, one acting as a reader and the other as a writer. The writer seems to only send about 3/4 of the data correctly to be read by the reader. Is there any way to guarantee that all the data is being sent? I think I've got it set up so that it reads and writes reliably, but it still seems to miss 1/4 of the data.

Heres the source of the writer

#define pipe "/tmp/testPipe"

using namespace std;

queue<string> sproutFeed;


ssize_t r_write(int fd, char *buf, size_t size) {
   char *bufp;
   size_t bytestowrite;
   ssize_t byteswritten;
   size_t totalbytes;

   for (bufp = buf, bytestowrite = size, totalbytes = 0;
        bytestowrite > 0;
        bufp += byteswritten, bytestowrite -= byteswritten) {
      byteswritten = write(fd, bufp, bytestowrite);
      if(errno == EPIPE)
      {
      signal(SIGPIPE,SIG_IGN);
      }
      if ((byteswritten) == -1 && (errno != EINTR))
         return -1;
      if (byteswritten == -1)
         byteswritten = 0;
      totalbytes += byteswritten;
   }
   return totalbytes;
}


void* sendData(void *thread_arg)
{

int fd, ret_val, count, numread;
string word;
char bufpipe[5];


ret_val = mkfifo(pipe, 0777); //make the sprout pipe

if (( ret_val == -1) && (errno != EEXIST)) 
{
 perror("Error creating named pipe");
 exit(1);
} 
while(1)
{
 if(!sproutFeed.empty())
 {
  string s;
  s.clear();
  s = sproutFeed.front();
  int sizeOfData = s.length();
  snprintf(bufpipe, 5, "%04d\0", sizeOfData); 
  char stringToSend[strlen(bufpipe) + sizeOfData +1];
  bzero(stringToSend, sizeof(stringToSend));     
  strncpy(stringToSend,bufpipe, strlen(bufpipe));   
  strncat(stringToSend,s.c_str(),strlen(s.c_str()));
  strncat(stringToSend, "\0", strlen("\0"));     
  int fullSize = strlen(stringToSend);   
  signal(SIGPIPE,SIG_IGN);

  fd = open(pipe,O_WRONLY);
  int numWrite = r_write(fd, stringToSend, strlen(stringToSend) );
  cout << errno << endl;
  if(errno == EPIPE)
  {
  signal(SIGPIPE,SIG_IGN);
  }

  if(numWrite != fullSize )
  {    
   signal(SIGPIPE,SIG_IGN);
   bzero(bufpipe, strlen(bufpipe));
   bzero(stringToSend, strlen(stringToSend));
   close(fd);
  }
  else
  {
   signal(SIGPIPE,SIG_IGN);
   sproutFeed.pop();
   close(fd);
   bzero(bufpipe, strlen(bufpipe));
   bzero(stringToSend, strlen(stringToSend));
  }     
 }
 else
 {
  if(usleep(.0002) == -1)
  {
   perror("sleeping error\n");
  }
 }
}

}

int main(int argc, char *argv[])
{
    signal(SIGPIPE,SIG_IGN);
    int x;
    for(x = 0; x < 100; x++)
    {
     sproutFeed.push("All ships in the sea sink except for that blue one over there, that one never sinks. Most likley because it\'s blue and thats the mightiest colour of ship. Interesting huh?");
    }
    int rc, i , status;
    pthread_t threads[1];  
    printf("Starting Threads...\n");
    pthread_create(&threads[0], NULL, sendData, NULL);
    rc = pthread_join(threads[0], (void **) &status);

}

Heres the source of the reader

#define pipe "/tmp/testPipe"

char dataString[50000];
using namespace std;
char *getSproutItem();

void* readItem(void *thread_arg)
{
    while(1)
    {
     x++;
     char *s = getSproutItem();
     if(s != NULL)
     {
      cout << "READ IN: " << s << endl;
     }
    }
}


ssize_t r_read(int fd, char *buf, size_t size) {
   ssize_t retval;
   while (retval = read(fd, buf, size), retval == -1 && errno == EINTR) ;
   return retval;
}


char * getSproutItem()
{
    cout << "Getting item" << endl;
    char stringSize[4];
    bzero(stringSize, sizeof(stringSize));
    int fd = open(pipe,O_RDONLY);
    cout << "Reading" << endl;

    int numread = r_read(fd,stringSize, sizeof(stringSize));


    if(errno == EPIPE)
    {
     signal(SIGPIPE,SIG_IGN);

    }
    cout << "Read Complete" << endl;

    if(numread > 1)
    {

     stringSize[numread] = '\0'; 
     int length = atoi(stringSize);
     char recievedString[length];
     bzero(recievedString, sizeof(recievedString));
     int numread1 = r_read(fd, recievedString, sizeof(recievedString));
     if(errno == EPIPE)
     {


signal(SIGPIPE,SIG_IGN);
 }  
 if(numread1 > 1)
 {
  recievedString[numread1] = '\0';
  cout << "DATA RECIEVED: " << recievedString << endl;
  bzero(dataString, sizeof(dataString));
  strncpy(dataString, recievedString, strlen(recievedString));
  strncat(dataString, "\0", strlen("\0"));
  close(fd); 
  return dataString;
 }
 else
 {
  return NULL;
 }

}
else
{
 return NULL;
}

close(fd);

}

int main(int argc, char *argv[])
{
     int rc, i , status;
     pthread_t threads[1];  
     printf("Starting Threads...\n");
     pthread_create(&threads[0], NULL, readItem, NULL);
     rc = pthread_join(threads[0], (void **) &status); 

}
A: 
1800 INFORMATION
"This is a syntax error" - as far as the standard is concerned. But for example gcc permits it (unless you're -pedantic), since it's in C99. Whatever the compiler, if it accepts it then I would expect it to work.
Steve Jessop
How odd. I wonder how that works behind the scenes, say if you have more than one dynamically sized array allocated on the stack
1800 INFORMATION
I assume that a common/similar mechanism is used to implement dynamic arrays and alloca, although the results of the two have different scope. I guess some fancy footwork is needed to reference both variables "above" and "below" the variable-length allocation: either you don't lay them down in scope order, or else you can't use constant offsets from a single sp. Never looked into it, though.
Steve Jessop
ok, so let me see if I get this straight. So the right way to do this would be to 1) send the length of the payload first (like is being done). Then have an array set to a constant size like char recievedString[5], then read until I get to the end or get an EOF?
whatWhat
A: 

The general method used to send data through named pipes is to tack on a header with the length of the payload. Then you read(fd, header_len); read(rd, data_len); Note the latter read() will need to be done in a loop until data_len is read or eof. Note also if you've multiple writers to a named pipe then the writes are atomic (as long as a reasonable size) I.E. multiple writers will not case partial messages in the kernel buffers.

pixelbeat
A: 

You are possibly getting bitten by POSIX thread signal handling semantics in your reader main thread. The POSIX standard allows for a POSIX thread to receive the signal, not necessarily the thread you expect. Block signals where not wanted. signal(SIG_PIPE,SIG_IGN) is your friend. Add one to reader main.

POSIX thread handling semantics, putting the POS into POSIX. ( but it does make it easier to implement POSIX threads.)

Examine the pipe in /tmp with ls ? is it not empty ?

Tim Williscroft
+3  A: 

You are definitely using signals the wrong way. Threads are completely unnecessary here - at least in the code provided. String calculations are just weird. Get this book and do not touch the keyboard until you finished reading :)

Nikolai N Fetissov
I was getting really frustrated and just started adding stuff. I ended up switching over from named pipes to message queues. My problem is easier solved with a message based ipc instead of stream based
whatWhat
Just be warned that SysV IPC limits you to a single machine. I'd recommend learning proper socket programming, which works well within one machine and across the network.
Nikolai N Fetissov