views:

345

answers:

2

This question is related to others I have asked on here, mainly regarding sorting huge sets of data in memory.

Basically this is what I want / have:

Twisted XMLRPC server running. This server keeps several (32) instances of Foo class in memory. Each Foo class contains a list bar (which will contain several million records). There is a service that retrieves data from a database, and passes it to the XMLRPC server. The data is basically a dictionary, with keys corresponding to each Foo instance, and values are a list of dictionaries, like so:

data = {'foo1':[{'k1':'v1', 'k2':'v2'}, {'k1':'v1', 'k2':'v2'}], 'foo2':...}

Each Foo instance is then passed the value corresponding to it's key, and the Foo.bar dictionaries are updated and sorted.

class XMLRPCController(xmlrpc.XMLRPC):

    def __init__(self):
        ...
        self.foos = {'foo1':Foo(), 'foo2':Foo(), 'foo3':Foo()}
        ...

    def update(self, data):
        for k, v in data:
            threads.deferToThread(self.foos[k].processData, v)

    def getData(self, fookey):
        # return first 10 records of specified Foo.bar
        return self.foos[fookey].bar[0:10]

class Foo():

    def __init__(self):
        bar = []

    def processData(self, new_bar_data):
        for record in new_bar_data:
            # do processing, and add record, then sort
            # BUNCH OF PROCESSING CODE
            self.bar.sort(reverse=True)

The problem is that when the update function is called in the XMLRPCController with a lot of records (say 100K +) it stops responding to my getData calls until all 32 Foo instances have completed the process_data method. I thought deferToThread would work, but I think I am misunderstanding where the problem is.

Any suggestions... I am open to using something else, like Cherrypy if it supports this required behavior.


EDIT

@Troy: This is how the reactor is set up

reactor.listenTCP(port_no, server.Site(XMLRPCController)
reactor.run()

As far as GIL, would it be a viable option to change sys.setcheckinterval() value to something smaller, so the lock on the data is released so it can be read?

A: 

I don't know how long your processData method runs nor how you're setting up your twisted reactor. By default, the twisted reactor has a thread pool of between 0 and 10 threads. You may be trying to defer as many as 32 long-running calculations to as many as 10 threads. This is sub-optimal.

You also need to ask what role the GIL is playing in updating all these collections.

Edit: Before you make any serious changes to your program (like calling sys.setcheckinterval()) you should probably run it using the profiler or the python trace module. These should tell you what methods are using all your time. Without the right information, you can't make the right changes.

Troy J. Farrell
+1  A: 

The easiest way to get the app to be responsive is to break up the CPU-intensive processing in smaller chunks, while letting the twisted reactor run in between. For example by calling reactor.callLater(0, process_next_chunk) to advance to next chunk. Effectively implementing cooperative multitasking by yourself.

Another way would be to use separate processes to do the work, then you will benefit from multiple cores. Take a look at Ampoule: https://launchpad.net/ampoule It provides an API similar to deferToThread.

truppo
Putting each of the Foo instances into its own child process does sound like a reasonable thing to try, at least for starters.
Jean-Paul Calderone