views:

114

answers:

1

Ultimately I'm trying to transfer buffers from one machine to another. The code below takes stream of <id><size><data with size bytes> and reads the part in the handleReadHeader function, then reads the <size> number of bytes, then goes back and waits for another <id><size> pair. I've pasted a lot of code, but really the only functions I'm suspicious of are:
Downlink::addMsgToQueue
Downlink::writeCallback
Downlink::startWrites() Downlink::handleReadHeader
Downlink::handleReadFrameDataBGR

using namespace std;
using namespace boost;
using namespace boost::asio;

Downlink::Downlink() :
  socket(nIO),
  headerSize(sizeof(unsigned int)+1),
  connected(false),
  isWriting(false),
  readHeaderBuffer(headerSize)
{}

Downlink::~Downlink() {
  disconnect();
}

bool Downlink::connect(const std::string &robotHost, unsigned int port) {
  disconnect();

  ip::tcp::resolver resolver(nIO);
  ip::tcp::resolver::query query(robotHost, lexical_cast<string>(port));
  ip::tcp::resolver::iterator iterator = resolver.resolve(query);

  ip::tcp::resolver::iterator end;
  boost::system::error_code ec;
  for(;iterator!=end;++iterator) {
    socket.connect(*iterator, ec);
    if(!ec)
      break;
    socket.close();
  }
  if(!socket.is_open())
    return false;

  async_read(socket, buffer(readHeaderBuffer), 
      bind(&Downlink::handleReadHeader, this, _1, _2));

  //start network thread.
  lock_guard<mutex> l(msgMutex);
  outgoingMessages = queue<vector<char> >();
  nIO.reset();
  t = thread(bind(&boost::asio::io_service::run, &nIO));
  connected = true;
  return true;
}

bool Downlink::isConnected() const {
  return connected;
}

void Downlink::disconnect() {
  nIO.stop();
  t.join();
  socket.close();
  connected = false;
  isWriting = false;
  nIO.reset();
  nIO.run();
}

void Downlink::writeToLogs(const std::string &logMsg) {
  vector<char> newMsg(logMsg.length()+headerSize);
  newMsg[0] = MSG_WRITE_LOG;
  const unsigned int msgLen(logMsg.length());
  memcpy(&newMsg[1], &msgLen, sizeof(unsigned int));
  vector<char>::iterator dataBegin = newMsg.begin();
  advance(dataBegin, headerSize);
  copy(logMsg.begin(), logMsg.end(), dataBegin);
  assert(newMsg.size()==(headerSize+logMsg.length()));
  addMsgToQueue(newMsg);
}

void Downlink::addMsgToQueue(const std::vector<char> &newMsg) {
  lock_guard<mutex> l(msgMutex);
  outgoingMessages.push(newMsg);
  lock_guard<mutex> l2(outMutex);
  if(!isWriting) {
    nIO.post(bind(&Downlink::startWrites, this));
  }
}

void Downlink::writeCallback(const boost::system::error_code& error,
        std::size_t bytes_transferred) {
  if(error) {
    disconnect();
    lock_guard<mutex> l(msgMutex);
    outgoingMessages = queue<vector<char> >();
    return;
  }
  {
    lock_guard<mutex> l2(outMutex);
    isWriting = false;
  }
  startWrites();
}


void Downlink::startWrites() {
  lock_guard<mutex> l(msgMutex);
  lock_guard<mutex> l2(outMutex);
  if(outgoingMessages.empty()) {
    isWriting = false;
    return;
  }

  if(!isWriting) {
    currentOutgoing = outgoingMessages.front();
    outgoingMessages.pop();
    async_write(socket, buffer(currentOutgoing),
  bind(&Downlink::writeCallback, this, _1, _2));
    isWriting = true;
  }
}

void Downlink::handleReadHeader(const boost::system::error_code& error,
    std::size_t bytes_transferred) {
  //TODO: how to handle disconnect on errors?
  cout<<"handleReadHeader"<<endl;
  if(error) {
    return;
  }
  assert(bytes_transferred==headerSize);
  if(bytes_transferred!=headerSize) {
    cout<<"got "<<bytes_transferred<<" while waiting for a header."<<endl;
  }
  currentPacketID = readHeaderBuffer[0];

  memcpy(&currentPacketLength, &readHeaderBuffer[1], sizeof(unsigned int));
  dataStream.resize(currentPacketLength);
  switch(currentPacketID) {
  case MSG_FRAME_BGR: {
    cout<<"- >> gone to read frame. ("<<currentPacketLength<<")"<<endl;
    async_read(socket, asio::buffer(dataStream), 
        boost::asio::transfer_at_least(currentPacketLength),
        bind(&Downlink::handleReadFrameDataBGR, this, _1, _2));    
  } break;
  default: {
    cout<<"->>> gone to read other. ("<<currentPacketLength<<")"<<endl;
    cout<<"      "<<(int)currentPacketID<<endl;
    async_read(socket, asio::buffer(dataStream), 
        boost::asio::transfer_at_least(currentPacketLength),
        bind(&Downlink::handleReadData, this, _1, _2));
  } break;
  }
}

void Downlink::handleReadData(const boost::system::error_code& error,
    std::size_t bytes_transferred) {
  cout<<"handleReadData"<<endl;
  if(error) {
    return;
  }
  if(bytes_transferred!=currentPacketLength) {
    cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
  }
  assert(bytes_transferred==currentPacketLength);

  switch(currentPacketID) {
  case MSG_ASCII: {
    string msg(dataStream.begin(), dataStream.end());
    textCallback(&msg);
  } break;
  case MSG_IMU: {
    Eigen::Vector3d a,g,m;
    unsigned int stamp;
    memcpy(a.data(), &dataStream[0], sizeof(double)*3);
    memcpy(m.data(), &dataStream[0]+sizeof(double)*3, sizeof(double)*3);
    memcpy(g.data(), &dataStream[0]+sizeof(double)*6, sizeof(double)*3);
    memcpy(&stamp, &dataStream[0]+sizeof(double)*9, sizeof(unsigned int));
    imuCallback(a,m,g,stamp);
  } break;
  default:
    //TODO: handle this better?
    cout<<"Unknown packet ID."<<endl;
  }

  async_read(socket, buffer(readHeaderBuffer), 
      boost::asio::transfer_at_least(headerSize),
      bind(&Downlink::handleReadHeader, this, _1, _2));
}

void Downlink::handleReadFrameDataBGR(const boost::system::error_code& error,
          std::size_t bytes_transferred) {
  cout<<"Got a frame"<<endl;
  if(error) {
    return;
  }
  if(bytes_transferred!=currentPacketLength) {
    cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
  }
  assert(bytes_transferred==currentPacketLength);
  unsigned int imageWidth, imageHeight, cameraID;

  unsigned char *readOffset = (unsigned char*)&dataStream[0];
  memcpy(&imageWidth, readOffset, sizeof(unsigned int)); 
  readOffset += sizeof(unsigned int);
  memcpy(&imageHeight, readOffset, sizeof(unsigned int)); 
  readOffset += sizeof(unsigned int);
  memcpy(&cameraID, readOffset, sizeof(unsigned int)); 
  readOffset += sizeof(unsigned int);

  cout<<"("<<imageWidth<<"x"<<imageHeight<<") ID = "<<cameraID<<endl;

  frameCallback(readOffset, imageWidth, imageHeight, cameraID);

  async_read(socket, buffer(readHeaderBuffer), 
      boost::asio::transfer_at_least(headerSize),
      bind(&Downlink::handleReadHeader, this, _1, _2));
}


boost::signals2::connection Downlink::connectTextDataCallback(boost::signals2::signal<void (std::string *)>::slot_type s) {
  return textCallback.connect(s);
}

boost::signals2::connection Downlink::connectIMUDataCallback(boost::signals2::signal<void (Eigen::Vector3d, Eigen::Vector3d, Eigen::Vector3d, unsigned int)>::slot_type s) {
  return imuCallback.connect(s);
}

boost::signals2::connection Downlink::connectVideoFrameCallback(boost::signals2::signal<void (unsigned char *, unsigned int, unsigned int, unsigned int)>::slot_type s) {
  return frameCallback.connect(s);
}

Here is the code on the other end. It's almost exactly the same as the other code, but the error could be in either end.

using namespace std;
using namespace boost;
using namespace boost::asio;

Uplink::Uplink(unsigned int port) :
  socket(nIO),
  acceptor(nIO),
  endpoint(ip::tcp::v4(), port),
  headerSize(sizeof(unsigned int)+1), //id + data size
  headerBuffer(headerSize)
{
  //move socket into accept state.
  acceptor.open(endpoint.protocol());
  acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
  acceptor.bind(endpoint);
  acceptor.listen(1);  //1 means only one client in connect queue.
  acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
  //start network thread.
  nIO.reset();
  t = thread(boost::bind(&boost::asio::io_service::run, &nIO));
}

Uplink::~Uplink() {
  nIO.stop();  //tell the network thread to stop.
  t.join();  //wait for the network thread to stop.
  acceptor.close(); //close listen port.
  socket.close();   //close active connections.
  nIO.reset();
  nIO.run(); //let clients know that we're disconnecting.
}

void Uplink::parse_header(const boost::system::error_code& error,
     std::size_t bytes_transferred) {
  if(error || bytes_transferred!=headerSize) {
    disconnect();
    return;
  }
  currentPacketID = headerBuffer[0];
  memcpy(&currentPacketLength, &headerBuffer[1], sizeof(unsigned int));
  //move to read data state

  //TODO: move to different states to parse various packet types.

  async_read(socket, asio::buffer(dataStream), transfer_at_least(currentPacketLength),
      bind(&Uplink::parse_data, this, _1, _2));
}

void Uplink::parse_data(const boost::system::error_code& error,
   std::size_t bytes_transferred) {
  if(error) {
    disconnect();
    return;
  }

  if(bytes_transferred != currentPacketLength) {
    cout<<"bytes_transferred != currentPacketLength"<<endl;
    disconnect();
    return;
  }

  //move back into the header reading state
  async_read(socket, buffer(headerBuffer), 
      bind(&Uplink::parse_header, this, _1, _2));
}

void Uplink::disconnect() {
  acceptor.close();
  socket.close();
  acceptor.open(endpoint.protocol());
  acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
  acceptor.bind(endpoint);
  acceptor.listen(1);  //1 means only one client in connect queue.
  acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
}

void Uplink::accept_handler(const boost::system::error_code& error)
{
  if (!error) {
    //no more clents.
    acceptor.close();
    //move to read header state.
    async_read(socket, buffer(headerBuffer), 
        bind(&Uplink::parse_header, this, _1, _2));
  }
}

void Uplink::sendASCIIMessage(const std::string &m) {
  //Format the message
  unsigned int msgLength(m.length());
  vector<char> outBuffer(msgLength+headerSize);
  outBuffer[0] = MSG_ASCII;
  memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
  vector<char>::iterator dataBegin(outBuffer.begin());
  advance(dataBegin, headerSize);
  copy(m.begin(), m.end(), dataBegin);
  //queue the message
  addToQueue(outBuffer);
}

void Uplink::sendIMUDataBlock(const nIMUDataBlock *d) {
  //Format the message.
    //a,g,m, 3 components each plus a stamp
  const unsigned int msgLength(3*3*sizeof(double)+sizeof(unsigned int)); 
  vector<char> outBuffer(msgLength+headerSize);
  outBuffer[0] = MSG_IMU;
  memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));

  const Eigen::Vector3d a(d->getAccel());
  const Eigen::Vector3d m(d->getMag());
  const Eigen::Vector3d g(d->getGyro());
  const unsigned int s(d->getUpdateStamp());

  memcpy(&outBuffer[headerSize], a.data(), sizeof(double)*3);
  memcpy(&outBuffer[headerSize+3*sizeof(double)], m.data(), sizeof(double)*3);
  memcpy(&outBuffer[headerSize+6*sizeof(double)], g.data(), sizeof(double)*3);
  memcpy(&outBuffer[headerSize+9*sizeof(double)], &s, sizeof(unsigned int));

  /*
  cout<<"----------------------------------------"<<endl;
  cout<<"Accel = ("<<a[0]<<","<<a[1]<<","<<a[2]<<")"<<endl;
  cout<<"Mag   = ("<<m[0]<<","<<m[1]<<","<<m[2]<<")"<<endl;
  cout<<"Gyro  = ("<<g[0]<<","<<g[1]<<","<<g[2]<<")"<<endl;
  cout<<"Stamp = "<<s<<endl;
  cout<<"----------------------------------------"<<endl;
  */

  //queue the message
  addToQueue(outBuffer);
}

void Uplink::send_handler(const boost::system::error_code& error,
     std::size_t bytes_transferred) {
  {
    lock_guard<mutex> l(queueLock);
    lock_guard<mutex> l2(sendingLock);
    if(outQueue.empty()) {
      currentlySending = false;
      return;
    }
  }
  startSend();
}

void Uplink::addToQueue(const std::vector<char> &out) {
  bool needsRestart = false;
  {
    lock_guard<mutex> l(queueLock);
    lock_guard<mutex> l2(sendingLock);
    outQueue.push(out);
    needsRestart = !currentlySending;
  }
  if(needsRestart)
    nIO.post(bind(&Uplink::startSend, this));
}

void Uplink::startSend() {
  lock_guard<mutex> l(queueLock);
  lock_guard<mutex> l2(sendingLock);
  if(outQueue.empty())
    return;
  currentlySending = true;
  currentWrite = outQueue.front();
  outQueue.pop();
  async_write(socket, buffer(currentWrite), bind(&Uplink::send_handler, 
       this, _1, _2));
}

void Uplink::sendVideoFrameBGR(const unsigned int width, const unsigned int height, 
          const unsigned int cameraID, const unsigned char *frameData) {
  //                             image data            image metadata        header
  const unsigned int packetSize(width*height*3   +   sizeof(unsigned int)*3 + headerSize);
  const unsigned int dataSize(width*height*3   +   sizeof(unsigned int)*3);
  vector<char> outgoingBuffer(packetSize);
  outgoingBuffer[0] = MSG_FRAME_BGR;
  memcpy(&outgoingBuffer[1], &dataSize, sizeof(unsigned int));
  char *writePtr = &outgoingBuffer[headerSize];
  memcpy(writePtr, &width, sizeof(unsigned int));
  writePtr += sizeof(unsigned int);
  memcpy(writePtr, &height, sizeof(unsigned int));
  writePtr += sizeof(unsigned int);
  memcpy(writePtr, &cameraID, sizeof(unsigned int));
  writePtr += sizeof(unsigned int);
  memcpy(writePtr, frameData, width*height*3*sizeof(char));

  //TODO: can we avoid the whole image copy here?
  //TODO: should come up with a better packet buffer build system.
  //IDEA!: maybe have a "request buffer" funxction so the Uplink
  //class can have sole ownership, rather than do the copy in "addtoQueue"
  addToQueue(outgoingBuffer);
}

This program works most of the time, but only rarely, when sending a lot of data with no delay between packets it will fail. For example:

sendVideoFrameBGR(...);  //occasional fail
sendASCIIMessage("...");

sendVideoFrameBGR(...);  //never fails.
sleep(1); 
sendASCIIMessage("...");

after handling a video frame in Downlink it goes back to the hadleHeaderData and waits for a packet that is several megabytes in length and for a packet ID that doesn't exist. Somehow the stream is getting corrupted. I don't know why.

I don't really care much for the code I have written now, so if anybody knows of a good class or library to parse streams over TCP into buffer blocks for me I'd rather use that.

EDIT:
Here is the exact code that runs the sending of data:

    if(frontImage) {
      uplink.sendVideoFrameBGR(frontImage->width, frontImage->height, 0,
                   (unsigned char*)frontImage->imageData);
      cout<<"Sent"<<endl;
      //sleep(1);   //works fine if this is uncommented !
    }

    uplink.sendASCIIMessage("Alive...");
    sleep(1);
    uplink.sendIMUDataBlock(imuDataBlock.get());
    cout<<"Loop"<<endl;
    sleep(1);
  }
+3  A: 

The problem is most likely that your ioservice object has more than one thread handling work.

When you call the second send function immediately after the first, the two function objects posted to the ioservice are probably being delegated to different threads. So basically, two writes are occurring on the same socket in parallel. This is most likely illegal. Using Winsock2 with non-blocking sockets, this would cause the outgoing data to be corrupted.

Even though you use a bool to check whether it's currently sending, the bool isn't checked until one of the ioservice threads is handling the function. If two ioservice threads are active when you post the two pieces of work, it could dispatch both sends at the same time, causing the two send functions to occur asynchronously on separate threads. The 'is currently sending' check may be returning false in both calls, since the two sends are running in parallel.

dauphic