views:

70

answers:

2

Hi all,

I'd like to develop a multithreaded UDP server in C/Linux. The service is running on a single port x, thus there's only the possibility to bind a single UDP socket to it. In order to work under high loads, I have n threads (statically defined), say 1 thread per CPU. Work could be delivered to the thread using epoll_wait, so threads get woken up on demand with 'EPOLLET | EPOLLONESHOT'. I've attached a code example:

static int epfd;
static sig_atomic_t sigint = 0;

...

/* Thread routine with epoll_wait */
static void *process_clients(void *pevents)
{
    int rc, i, sock, nfds;
    struct epoll_event ep, *events = (struct epoll_event *) pevents;

    while (!sigint) {
        nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500);

        for (i = 0; i < nfds; ++i) {
           if (events[i].data.fd < 0)
                continue;

           sock = events[i].data.fd;

           if((events[i].events & EPOLLIN) == EPOLLIN) {
               printf("Event dispatch!\n");
               handle_request(sock); // do a recvfrom
           } else
               whine("Unknown poll event!\n");

           memset(&ep, 0, sizeof(ep));
           ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
           ep.data.fd = sock;

           rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep);
           if(rc < 0)
               error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");
       }
    }

    pthread_exit(NULL);
}

int main(int argc, char **argv)
{
    int rc, i, cpu, sock, opts;
    struct sockaddr_in sin;
    struct epoll_event ep, *events;
    char *local_addr = "192.168.1.108";
    void *status;
    pthread_t *threads = NULL;
    cpu_set_t cpuset;

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM);
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM);

    sock = socket(PF_INET, SOCK_DGRAM, 0);
    if (sock < 0)
        error_and_die(EXIT_FAILURE, "Cannot create socket!\n");

    /* Non-blocking */
    opts = fcntl(sock, F_GETFL);
    if(opts < 0)
        error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n");
    opts |= O_NONBLOCK;
    rc = fcntl(sock, F_SETFL, opts);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n");

    /* Initial epoll setup */
    epfd = epoll_create(MAX_EVENT_NUM);
    if(epfd < 0)
        error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n");

    memset(&ep, 0, sizeof(ep));
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    ep.data.fd = sock;

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep);
    if(rc < 0)
        error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n");

    /* Socket binding */
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = inet_addr(local_addr);
    sin.sin_port = htons(port_xy);

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
    if (rc < 0)
        error_and_die(EXIT_FAILURE, "Problem binding to port! "
                      "Already in use?\n");

    register_signal(SIGINT, &signal_handler);

    /* Thread initialization */
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_create(&threads[i], NULL, process_clients, events);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        CPU_ZERO(&cpuset);
        CPU_SET(cpu, &cpuset);

        rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Cannot create pthread!\n");

        cpu = (cpu + 1) % NR_CPUS_ON;
    }

    printf("up and running!\n");

    /* Thread joining */
    for (i = 0; i < MAX_THRD_NUM; ++i) {
        rc = pthread_join(threads[i], &status);
        if (rc != 0)
            error_and_die(EXIT_FAILURE, "Error on thread exit!\n");
    }

    close(sock);
    xfree(threads);
    xfree(events);

    printf("shut down!\n");

    return 0;
}

Is this the proper way of handling this scenario with epoll? Should the function _handle_request_ return as fast as possible, because for this time the eventqueue for the socket is blocked?!

Thanks for replies!

A: 

No, this will not work the way you want to. To have worker threads process events arriving through an epoll interface, you need a different architecture.

Example design (there are several ways to do this) Uses: SysV/POSIX semaphores.

  • Have the master thread spawn n subthreads and a semaphore, then block epolling your sockets (or whatever).

  • Have each subthread block on down-ing the semaphore.

  • When the master thread unblocks, it stores the events in some global structure and ups the semaphore once per event.

  • The subthreads unblock, process the events, block again when the semaphore returns to 0.

You can use a pipe shared among all threads to achieve very similar functionality to that of the semaphore. This would let you block on select() instead of the semaphore, which you can use to wake the threads up on some other event (timeouts, other pipes, etc.)

You can also reverse this control, and have the master thread wake up when its workers demand tasks. I think the above approach is better for your case, though.

Santiago Lezica
Isn't this similar to what epoll does internally? It has a kind of an event queue and an event dispatcher. threads get waken up on demand with epoll_wait?! I read this thead on the LKML where the PowerDNS developer had a similar question (http://www.gossamer-threads.com/lists/linux/kernel/1197050)...
Daniel
You made me doubt there,... I'm fairly sure, however, that having multiple threads waiting on the same epoll descriptor causes the Thundering Herd problem. Wait, I'm not entirely sure now. Dammit.
Santiago Lezica
You only get a thundering herd if you are not using EPOLLET (edge-triggered). BTW, depending on what you are doing in handle_request you can probably get away without using EPOLLONESHOT. But still, epoll doesn't make much sense if you only have a single socket.
cmeerw
Thanks for your answer! As mentioned on the link, I was wondering about the PowerDNS Recursor, which is a UDP server running on a specific port using epoll and a thread pool. Obviously it is very performant, but may be I am just wrong ... hmm.
Daniel
The case is slightly different for PowerDNS (at least different from what you told us in your original question). PowerDNS uses a single socket for receiving client requests, but it then also creates sockets for outgoing requests/responses (to upstream servers) - and that's why it needs to use epoll.
cmeerw
Okay, thanks for pointing this out!
Daniel
+1  A: 

As you are only using a single UDP socket, there is no point using epoll - just use a blocking recvfrom instead.

Now, depending on the protocol you need to handle - if you can process each UDP packet individually - you can actually call recvfrom concurrently from multiple threads (in a thread pool). The OS will take care that exactly one thread will receive the UDP packet. This thread can then do whatever it needs to do in handle_request.

However, if you need to process the UDP packets in a particular order, you'll probably not have that many opportunities to parallalise your program...

cmeerw
Exactly what I was going to say :)
MarkR