views:

140

answers:

2

I currently have a Python application where newline-terminated ASCII strings are being transmitted to me via a TCP/IP socket. I have a high data rate of these strings and I need to parse them as quickly as possible. Currently, the strings are being transmitted as CSV and if the data rate is high enough, my Python application starts to lag behind the input data rate (probably not all that surprising).

The strings look something like this:

chan,2007-07-13T23:24:40.143,0,0188878425-079,0,0,True,S-4001,UNSIGNED_INT,name1,module1,...

I have a corresponding object that will parse these strings and store all of the data into an object. Currently the object looks something like this:

class ChanVal(object):
    def __init__(self, csvString=None,**kwargs):

        if csvString is not None:
            self.parseFromCsv(csvString)

        for key in kwargs:
                setattr(self,key,kwargs[key])

    def parseFromCsv(self, csvString):

        lst = csvString.split(',')

        self.eventTime=lst[1]
        self.eventTimeExact=long(lst[2])
        self.other_clock=lst[3]
        ...

To read the data in from the socket, I'm using a basic "socket.socket(socket.AF_INET,socket.SOCK_STREAM)" (my app is the server socket) and then I'm using the "select.poll()" object from the "select" module to constantly poll the socket for new input using its "poll(...)" method.

I have some control over the process sending the data (meaning I can get the sender to change the format), but it would be really convenient if we could speed up the ASCII processing enough to not have to use fixed-width or binary formats for the data.

So up until now, here are the things I've tried and haven't really made much of a difference:

  1. Using the string "split" method and then indexing the list of results directly (see above), but "split" seems to be really slow.
  2. Using the "reader" object in the "csv" module to parse the strings
  3. Changing the strings being sent to a string format that I can use to directly instantiate an object via "eval" (e.g. sending something like "ChanVal(eventTime='2007-07-13T23:24:40.143',eventTimeExact=0,...)")

I'm trying to avoid going to a fixed-width or binary format, though I realize those would probably ultimately be much faster.

Ultimately, I'm open to suggestions on better ways to poll the socket, better ways to format/parse the data (though hopefully we can stick with ASCII) or anything else you can think of.

Thanks!

+3  A: 

You can't make Python faster. But you can make your Python application faster.

Principle 1: Do Less.

You can't do less input parsing over all but you can do less input parsing in the process that's also reading the socket and doing everything else with the data.

Generally, do this.

Break your application into a pipeline of discrete steps.

  1. Read the socket, break into fields, create a named tuple, write the tuple to a pipe with something like pickle.

  2. Read a pipe (with pickle) to construct the named tuple, do some processing, write to another pipe.

  3. Read a pipe, do some processing, write to a file or something.

Each of these three processes, connected with OS pipes, runs concurrently. That means that the first process is reading the socket and make tuples while the second process is consuming tuples and doing calculations while the third process is doing calculations and writing a file.

This kind of pipeline maximizes what your CPU can do. Without too many painful tricks.

Reading and writing to pipes is trivial, since linux assures you that sys.stdin and sys.stdout will be pipes when the shell creates the pipeline.

Before doing anything else, break your program into pipeline stages.

proc1.py

import cPickle
from collections import namedtuple

ChanVal= namedtuple( 'ChanVal', ['eventTime','eventTimeExact', 'other_clock', ... ] )
for line socket:
    c= ChanVal( **line.split(',') )
    cPickle.dump( sys.stdout )

proc2.py

import cPickle
from collections import namedtuple
ChanVal= namedtuple( 'ChanVal', ['eventTime','eventTimeExact', 'other_clock', ... ] )
while True:
    item = cPickle.load( sys.stdin )
    # processing
    cPickle.dump( sys.stdout )

This idea of processing namedtuples through a pipeline is very scalable.

python proc1.py | python proc2.py
S.Lott
+1. On modern multi-core systems, this approach can be very effective.
Kevin Little
On rotten old single-core systems, this is also very effective. A great deal of this kind of processing is bound by physical I/O; splitting it up into separate processes disentangles the socket I/O from the disk I/O.
S.Lott
A: 

You need to profile your code to find out where the time is being spent.

That doesn't necessarily mean using python's profiler

For example you can just try parsing the same csv string 1000000 times with different methods. Choose the fastest method - divide by 1000000 now you know how much CPU time it takes to parse a string

Try to break the program into parts and work out how what resources are really required by each part.

The parts that need the most CPU per input line are your bottle necks

On my computer, the program below outputs this

ChanVal0 took 0.210402965546 seconds
ChanVal1 took 0.350302934647 seconds
ChanVal2 took 0.558166980743 seconds
ChanVal3 took 0.691503047943 seconds

So you see that about half the time there is taken up by parseFromCsv. But also that quite a lot of time is taken extracting the values and storing them in the class.

If the class isn't used right away it might be faster to store the raw data and use properties to parse the csvString on demand.

from time import time
import re

class ChanVal0(object):
    def __init__(self, csvString=None,**kwargs):
        self.csvString=csvString
        for key in kwargs:
            setattr(self,key,kwargs[key])

class ChanVal1(object):
    def __init__(self, csvString=None,**kwargs):
        if csvString is not None:
            self.parseFromCsv(csvString)
        for key in kwargs:
                setattr(self,key,kwargs[key])

    def parseFromCsv(self, csvString):
        self.lst = csvString.split(',')

class ChanVal2(object):
    def __init__(self, csvString=None,**kwargs):
        if csvString is not None:
            self.parseFromCsv(csvString)
        for key in kwargs:
                setattr(self,key,kwargs[key])

    def parseFromCsv(self, csvString):
        lst = csvString.split(',')
        self.eventTime=lst[1]
        self.eventTimeExact=long(lst[2])
        self.other_clock=lst[3]


class ChanVal3(object):
    splitter=re.compile("[^,]*,(?P<eventTime>[^,]*),(?P<eventTimeExact>[^,]*),(?P<other_clock>[^,]*)")
    def __init__(self, csvString=None,**kwargs):
        if csvString is not None:
            self.parseFromCsv(csvString)
        self.__dict__.update(kwargs)

    def parseFromCsv(self, csvString):
        self.__dict__.update(self.splitter.match(csvString).groupdict())


s="chan,2007-07-13T23:24:40.143,0,0188878425-079,0,0,True,S-4001,UNSIGNED_INT,name1,module1"
RUNS=100000

for cls in ChanVal0, ChanVal1, ChanVal2, ChanVal3:
    start_time = time()
    for i in xrange(RUNS):
        cls(s)
    print "%s took %s seconds"%(cls.__name__, time()-start_time) 
gnibbler