views:

999

answers:

4

I wanted to write a server that a client could connect to and receive periodic updates without having to poll. The problem I have experienced with asyncore is that if you do not return true when dispatcher.writable() is called, you have to wait until after the asyncore.loop has timed out (default is 30s).

The two ways I have tried to work around this is 1) reduce timeout to a low value or 2) query connections for when they will next update and generate an adequate timeout value. However if you refer to 'Select Law' in 'man 2 select_tut', it states, "You should always try to use select() without a timeout."

Is there a better way to do this? Twisted maybe? I wanted to try and avoid extra threads. I'll include the variable timeout example here:

#!/usr/bin/python

import time
import socket
import asyncore


# in seconds
UPDATE_PERIOD = 4.0

class Channel(asyncore.dispatcher):

    def __init__(self, sock, sck_map):
        asyncore.dispatcher.__init__(self, sock=sock, map=sck_map)
        self.last_update = 0.0  # should update immediately
        self.send_buf = ''
        self.recv_buf = ''

    def writable(self):
        return len(self.send_buf) > 0

    def handle_write(self):
        nbytes = self.send(self.send_buf)
        self.send_buf = self.send_buf[nbytes:]

    def handle_read(self):
        print 'read'
        print 'recv:', self.recv(4096)

    def handle_close(self):
        print 'close'
        self.close()

    # added for variable timeout
    def update(self):
        if time.time() >= self.next_update():
            self.send_buf += 'hello %f\n'%(time.time())
            self.last_update = time.time()

    def next_update(self):
        return self.last_update + UPDATE_PERIOD


class Server(asyncore.dispatcher):

    def __init__(self, port, sck_map):
        asyncore.dispatcher.__init__(self, map=sck_map)
        self.port = port
        self.sck_map = sck_map
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind( ("", port))
        self.listen(16)
        print "listening on port", self.port

    def handle_accept(self):
        (conn, addr) = self.accept()
        Channel(sock=conn, sck_map=self.sck_map)

    # added for variable timeout
    def update(self):
        pass

    def next_update(self):
        return None


sck_map = {}

server = Server(9090, sck_map)
while True:
    next_update = time.time() + 30.0
    for c in sck_map.values():
        c.update()  # <-- fill write buffers
        n = c.next_update()
        #print 'n:',n
        if n is not None:
            next_update = min(next_update, n)
    _timeout = max(0.1, next_update - time.time())

    asyncore.loop(timeout=_timeout, count=1, map=sck_map)
+2  A: 

The "select law" doesn't apply to your case, as you have not only client-triggered (pure server) activities, but also time-triggered activities - this is precisely what the select timeout is for. What the law should really say is "if you specify a timeout, make sure you actually have to do something useful when the timeout arrives". The law is meant to protect against busy-waiting; your code does not busy-wait.

I would not set _timeout to the maximum of 0.1 and the next update time, but to the maximum of 0.0 and the next timeout. IOW, if an update period has expired while you were doing updates, you should do that specific update right away.

Instead of asking each channel every time whether it wants to update, you could store all channels in a priority queue (sorted by next-update time), and then only run update for the earliest channels (until you find one whose update time has not arrived). You can use the heapq module for that.

You can also save a few system calls by not having each channel ask for the current time, but only poll the current time once, and pass it to .update.

Martin v. Löwis
+1  A: 

I would use Twisted, long time since I used asyncore but I think this should be the twisted equivalent (not tested, written from memory):

from twisted.internet import reactor, protocol
import time

UPDATE_PERIOD = 4.0

class MyClient(protocol.Protocol):

    def connectionMade(self):
        self.updateCall = reactor.callLater(UPDATE_PERIOD, self.update)

    def connectionLost(self, reason):
        self.updateCall.cancel()

    def update(self):
        self.transport.write("hello %f\n" % (time.time(),))

    def dataReceived(self, data):
        print "recv:", data


f = protocol.ServerFactory()
f.protocol = MyClient

reactor.listenTCP(9090, f)
reactor.run()
truppo
+1: with twisted code gets readable and easy to maintain.
nosklo
Code works (pretty good if from memory!) but needed a small change to call reactor.callLater() in the update() method to send the next update. Otherwise you only get one message and the updateCall.cancel() will fail on disconnection.My only problem is that twisted adds an extra dependency but I will have to weigh this against actual productivity and readability.
Nick Sonneveld
A: 

Nick:Which is the small change to make it function? Could you put the code? Thanks

virk
A: 

Maybe you can do this with sched.scheduler, like this (n.b. not tested):

import sched, asyncore, time

# Create a scheduler with a delay function that calls asyncore.loop
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time()) )

# Add the update timeouts with scheduler.enter
# ...

def _poll_loop(timeout, start_time):  
  asyncore.loop(timeout, count=1)
  finish_time = time.time()
  timeleft = finish_time - start_time
  if timeleft > timeout:  # there was a message and the timeout delay is not finished
    _poll_loop(timeleft, finish_time) # so wait some more polling the socket

def main_loop():
  while True:
    if scheduler.empty():
      asyncore.loop(30.0, count=1) # just default timeout, use what suits you
      # add other work that might create scheduled events here
    else:
      scheduler.run()
demiurgus