tags:

views:

93

answers:

5

Hello everybody!

First, a little bit of context to explain why I am on the "UDP sampling" route:
I would like to sample data produced at a fast rate for an unknown period of time. The data I want to sample is on another machine than the one consuming the data. I have a dedicated Ethernet connection between the two so bandwidth is not an issue. The problem I have is that the machine consuming the data is much slower than the one producing it. An added constraint is that while it's ok that I don't get all the samples (they are just samples), it is mandatory that I get the last one.

My 1st solution was to make the data producer send a UDP datagram for each produced sample and let the data consumer try to get the samples it could and let the others be discarded by the socket layer when the UDP socket is full. The problem with this solution is that when new UDP datagrams arrive and the socket is full, it is the new datagrams that get discarded and not the old ones. Therefore I am not guarantueed to have the last one!

My question is: is there a way to make a UDP socket replace old datagrams when new arrive?

The receiver is currently a Linux machine, but that could change in favor of another unix-like OS in the future (windows may be possible as it implements BSD sockets, but less likely)
The ideal solution would use widespread mecanisms (like setsockopt()s) to work.

PS: I thought of other solutions but they are more complex (involve heavy modification of the sender), therefore I would like first to have a definite status on the feasability of what I ask! :)

Updates: - I know that the OS on the receiving machine can handle the network load + reassembly of the traffic generated by the sender. It's just that its default behaviour is to discard new datagrams when the socket buffer is full. And because of the processing times in the receiving process, I know it will become full whatever I do (wasting half of the memory on a socket buffer is not an option :)).
- I really would like to avoid having an helper process doing what the OS could have done at packet-dispatching time and waste resource just copying messages in a SHM.
- The problem I see with modifying the sender is that the code which I have access to is just a PleaseSendThisData() function, it has no knowledge that it can be the last time it is called before a long time, so I don't see any doable tricks at that end... but I'm open to suggestions! :)

If there are really no way to change the UDP receiving behaviour in a BSD socket, then well... just tell me, I am prepared to accept this terrible truth and will start working on the "helper process" solution when I go back to it :)

+5  A: 

Just set the socket to non-blocking, and loop on recv() until it returns < 0 with errno == EAGAIN. Then process the last packet you got, rinse and repeat.

caf
The problem is that the receiving machine is much slower than the sending machine and while it is processing the last datagram in the socket buffer, the socket buffer may become full again and lose the real last datagram of the "transmission".
Nawak
@Nawak: To work around that, you can set a large enough recieve buffer on the socket, as valdo explains.
caf
+3  A: 

I agree with "caf". Set the socket to a non-blocking mode.

Whenever you receive something on the socket - read in a loop until nothing more is left. Then handle the last read datagram.

Only one note: you should set a large system receive buffer for the socket

int nRcvBufSize = 5*1024*1024; // or whatever you think is ok
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize));
valdo
+1  A: 

Another idea is to have a dedicated reader process that does nothing but loops on the socket and reads incoming packets into circular buffer in shared memory (you'll have to worry about proper write ordering). Something like kfifo. Non-blocking is fine here too. New data overrides old data. Then other process(es) would always have access to latest block at the head of the queue and all the previous chunks not yet overwritten.

Might be too complicated for a simple one-way reader, just an option.

Nikolai N Fetissov
Yes, my initial fear was that it would be the only solution. I don't like it as it would waste a bit of the already low resources on the receiver... I was hoping for some trick in the BSD socket that would allow overwrite of old messages instead of discarding the new ones. That way the OS would do almost the same amount of work and there would remain enough resources for the processing part.
Nawak
No, sockets don't work like that. Another option is to coalesce the updates on the faster machine and publish compound data on a coarser timer.
Nikolai N Fetissov
+2  A: 

This will be difficult to get completely right just on the listener side since it could actually miss the last packet in the Network Interface Chip, which will keep your program from ever having had a chance at seeing it.

The operating system's UDP code would be the best place to try to deal with this since it will get new packets even if it decides to discard them because it already has too many queued up. Then it could make the decision of dropping an old one or dropping a new one, but I don't know how to go about telling it that this is what you would want it to do.

You can try to deal with this on the receiver by having one program or thread that always tries to read in the newest packet and another that always tries to get that newest packet. How to do this would differ based on if you did it as two separate programs or as two threads.

As threads you would need a mutex (semaphore or something like it) to protect a pointer (or reference) to a structure used to hold 1 UDP payload and whatever else you wanted in there (size, sender IP, sender port, timestamp, etc).

The thread that actually read packets from the socket would store the packet's data in a struct, acquire the mutex protecting that pointer, swap out the current pointer for a pointer to the struct it just made, release the mutex, signal the processor thread that it has something to do, and then clear out the structure that it just got a pointer to and use it to hold the next packet that comes in.

The thread that actually processed packet payloads should wait on the signal from the other thread and/or periodically (500 ms or so is probably a good starting point for this, but you decide) and aquire the mutex, swap its pointer to a UDP payload structure with the one that is there, release the mutex, and then if the structure has any packet data it should process it and then wait on the next signal. If it did not have any data it should just go ahead and wait on the next signal.

The processor thread should probably run at a lower priority than the UDP listener so that the listener is less likely to ever miss a packet. When processing the last packet (the one you really care about) the processor will not be interrupted because there are no new packets for the listener to hear.

You could extend this by using a queue rather than just a single pointer as the swapping place for the two threads. The single pointer is just a queue of length 1 and is very easy to process.

You could also extend this by attempting to have the listener thread detect if there are multiple packets waiting and only actually putting the last of those into the queue for the processor thread. How you do this will differ by platform, but if you are using a *nix then this should return 0 for sockets with nothing waiting:

while (keep_doing_this()) {
    ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len); 
    // this could have been recv or recvfrom
    if (len < 0) {
        error();
    }
    int sz;
    int rc = ioctl(udp_socket_fd, FIONREAD, &sz);
    if (rc < 0) {
        error();
    }
    if (!sz) {
        // There aren't any more packets ready, so queue up the one we got
        my_udp_packet->current_len = len;

        my_udp_packet = swap_udp_packet(my_ucp_packet);
        /* swap_udp_packet is code you would have to write to implement what I talked
           about above. */

        tgkill(this_group, procesor_thread_tid, SIGUSR1);
    } else if (sz > my_udp_packet->buf_len) {
        /* You could resize the buffer for the packet payload here if it is too small.*/
    }
}

A udp_packet would have to be allocated for each thread as well as 1 for the swapping pointer. If you use a queue for swapping then you must have enough udp_packets for each position in the queue -- since the pointer is just a queue of length 1 it only needs 1.

If you are using a POSIX system then consider not using a real time signal for the signaling because they queue up. Using a regular signal will allow you to treat being signaled many times the same as being signaled just once until the signal is handled, while real time signals queue up. Waking up periodically to check the queue also allows you to handle the possibility of the last signal arriving just after you have checked to see if you had any new packets but before you call pause to wait on a signal.

nategoose
You are right that what I am looking for is a way to tell the OS to change its behaviour when dispatching new UDP datagrams.Your proposed algorithm is my "plan B" and I am increasingly feeling that it will be my only simple solution... :/Thanks a lot for taking the time to explain it in such details, it was an interesting read!PS: in the "plan B" solution I had in mind, I wasn't using signals but condition variables, which I like more because they are not asynchronous like signals.
Nawak
@Nawak: Using signals allow your processing thread to not use up CPU time while there is no work to do. If this is a single processor machine then the two threads will be competing for CPU time already and making the processor busy wait will slow the whole thing down (likely slower than it already is). In your signal handler you can increment a volatile int which the processor thread looks at when it wakes up. If the variable isn't changed the thread calls `pause` again and goes back to sleep.
nategoose
Condition variables are not "busy waiting". The thread calling pthread_cond_wait() is actually sleeping until it is awoken by the other thread's pthread_cond_signal() or pthread_cond_broadcast()...The added advantage is that the mutex protecting the shared memory can be used with the cond_var and the awoken thread automatically owns the shared memory's mutex on wake-up.
Nawak
As everybody seem to think there's no solution to modify the socket's behaviour, I am marking this question as answered and I choose your answer since it is the most detailed.Thanks everybody!
Nawak
+1  A: 

I'm pretty sure that this is a provably insoluble problem closely related to the Two Army Problem.

I can think of a dirty solution: establish a TCP "control" sideband connection which carries the last packet which is also a "end transmission" indication. Otherwise you need to use one of the more general pragmatic means noted in Engineering Approaches.

msw