views:

110

answers:

5

Performance tuning: writing data to multiple pipes

Now I'm doing it in a single thread:

for(unsigned int i = 0; i < myvector.size();)
{
    tmp_pipe = myvector[i];
    fSuccess = WriteFile( tmp_pipe, &Time, sizeof(double), &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    fSuccess = WriteFile( tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    fSuccess = WriteFile( tmp_pipe, pBuffer, BufferLen, &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    i++;
}

And the result is that the first pipe gets data fastest ,and the last pipe slowest.

I'm thinking of doing it in separate threads so each pipe is equally processed.

But how can I run a specific function of thread asynchronously(the main thread should get return immediately) in c/c++?

+2  A: 

You can use the CreateThread function to create a new thread and pass the pipe handle as a parameter to the thread function:

DWORD PipeThread(LPVOID param) {
  HANDLE hPipe = (HANDLE)param;
  // Do the WriteFile operations here
  return 0;
}

for(unsigned int i = 0; i < myvector.size(); i++)
  CreateThread(NULL, 0, PipeThread, myvector[i], 0, NULL);

Note that the vector class isn't thread-safe, so you'll face problems with myvector.erase if you don't synchronize access to them, for eg. using a critical section.


Update: Since you mentioned high frequency, you could use I/O completion ports instead of a separate thread for each pipe. You can then use overlapped I/O with WriteFile to perform the write asynchronously and you could have just one extra thread that listens for completion of writes:

// Initial setup: add pipe handles to a completion port
HANDLE hPort = CreateCompletionPort(myvector[0], NULL, 0, 1);
for (unsigned int i = 1; i < myvector.size(); i++)
  CreateCompletionPort(myvector[i], hPort, 0, 0);

// Start thread
CreateThread(NULL, 0, PipeThread, hPort, 0, NULL);

// Do this as many times as you want
for(unsigned int i = 0; i < myvector.size(); i++) {
  OVERLAPPED *ov = new OVERLAPPED;
  ZeroMemory(ov, sizeof ov);
  WriteFile(myvector[i], buffer, size, NULL, ov);
  // If pipe handle was closed, WriteFile will fail immediately
  // Otherwise I/O is performed asynchronously
}

// Close the completion port at the end
// This should automatically free the thread
CloseHandle(hPort);

---

DWRD PipeThread(LPVOID param) {
  HANDLE hPort = (HANDLE)param;
  DWORD nBytes;
  ULONG_PTR key;
  LPOVERLAPPED ov;

  // Continuously loop for I/O completion
  while (GetQueuedCompletionStatus(hPort, &nBytes, &key, &ov, INFINITE)) {
    if (ov != NULL) {
      delete ov;
      // Do anything else you may want to do here
    }
  }

  return 0;
}
casablanca
My above `for` loop is run at a very high frequency so I can't `CreateThread` each time,so everything should assume that the thread is already created.
COMer
@COMer I do not think this solution is meant to call CreateThread each time ... each time part of the code is the for loop marked as do this as many times you want.
A: 

Do you have writev() available? If so, you can reduce the three write operations to one per pipe, which would be more efficient. It also slightly simplifies the error handling, but you could collapse what you have to:

for (unsigned int i = 0; i < myvector.size(); i++)
{
    tmp_pipe = myvector[i];
    if (!WriteFile(tmp_pipe, &Time,      sizeof(double), &dwWritten, NULL) ||
        !WriteFile(tmp_pipe, &BufferLen, sizeof(long),   &dwWritten, NULL) ||
        !WriteFile(tmp_pipe, pBuffer,    BufferLen,      &dwWritten, NULL))
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
    }
}

This is simpler to read in many ways, because there is 1/3 the error handling - so the operational code is less hidden.

Of course, you would still want to wrap this code into threaded code, so the write operations would be handled by separate threads. You'd arrange for each thread to get its own pipe; they'd share read-only access to the time and buffer. Each thread would do its writing and return a status as it completes. The parent thread would wait for each of the child threads, and if a child thread reported that it failed, the corresponding client pipe would be removed from the vector. Since only the parent thread would manipulate the vector, there are no threading issues to worry about there.

In outline:

 for (i = 0; i < myvector.size(); i++)
      tid[i] = thread create (myvector[i], write_to_pipe);
 for (i = 0; i < myvector.size(); i++)
 {
      status = wait for thread(tid[i]);
      if (status != success)
           myvector.erase(...);
 }

The array (or vector) tid holds the thread identities. The write_to_pipe() function is the thread main function; it does the writing on the pipe it is passed and exits with the appropriate status.

Jonathan Leffler
The main idea here is to reduce `wait`,so I think `wait for thread` won't improve the performance .
COMer
@COMer: I am under the impression that the objective was to get the data written to the multiple destinations concurrently. If the threads are left to run free, then the coordination problems are more severe - how does each thread know what to write, and when to write it; how does the coordinator thread know when it is safe to change the data values that are written; how does the coordinator know when one of the threads gives up because of a write failure.
Jonathan Leffler
A: 

Are these named pipes? If so, you can use FILE_FLAG_OVERLAPPED when you create them, which allows you to do asynchronous writes without dealing with threads. Here's an example from MSDN.

If these are anonymous pipes, overlapped I/O is not supported, so this might be a good reason to switch to named pipes.

Another option might be to queue a work item for each write, but this doesn't guarantee that all three writes will execute simultaneously.

bk1e
A: 

I would consider preparing the data in a way that tries to reduce the number of I/O calls. After knowing that the data is being written in the must efficient way with the least number of I/O calls possible I would consider using asynchronous I/O. If performance is still not good enough then consider adding additional threads to the design.

One way that you may be able to reduce the number of writes is to use a structure that combines all data so that one write can be used instead of three. The pragma's would be needed to get rid of any extra padding/alignment that the compiler might added.

#pragma pack(push,1)
struct PipeData {
   double _time;
   long _buffer_len;
   char* _buffer;
};
#pragma pack(pop)

PipeData data;
int data_len = sizeof(double) + sizeof(long) + <yourbufferlen>;

for(unsigned int i = 0; i < myvector.size();)
{
    tmp_pipe = myvector[i];
    fSuccess = WriteFile( tmp_pipe, &data, data_len, &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    i++;
}
skimobear
A: 

To specifically answer your question "how can I run a specific function of thread asynchronously(the main thread should get return immediately) in c/c++?"

You can do this easily by disassociating the two acts. Create a worker pool of threads, initialized by the pipe to which it needs to communicate and write a function which will schedule a new job to these threads. This will be instantaneous compared to your pipe writes and all the threads will get its job and will start writing to the pipes it controls while your main thread is free to continue with its job.

This will be an easy solution if the no of clients you have is small. if not, creating one thread for each client will not scale much after a point and your server will be bigged down with lots of context switches and thread contention.

In that scenario for a very server design, you should give serious thought to Casablanca's solution. It creates only a single thread to listen for completion notifications and is the most efficient server design as of win 2003 to create servers in windows.