views:

63

answers:

1

Basically what I'm trying to do is fetch a couple of websites using proxies and process the data. The problem is that the requests rarely fail in a convincing way, setting socket timeouts wasnt very helpful either because they often didn't work.

So what I did is:

q = Queue()
s = ['google.com','ebay.com',] # And so on
for item in s:
    q.put(item)


def worker():
        item = q.get()
        data = fetch(item) # This is the buggy part
        # Process the data, yadayada

for i in range(workers):
    t = InterruptableThread(target=worker)
    t.start()


# Somewhere else
if WorkerHasLivedLongerThanTimeout:
    worker.terminate()

(InterruptableThread class) The problem is that I only want to kill threads which are still stuck on the fetching part. Also, I want the item to return to the queue. Ie:

def worker():
        self.status = 0
        item = q.get()
        data = fetch(item) # This is the buggy part
        self.status = 1 # Don't kill me now, bro!
        # Process the data, yadayada

# Somewhere else
if WorkerHasLivedLongerThanTimeout and worker.status != 1:
    q.put(worker.item)
    worker.terminate()

How can this be done?

+1  A: 

edit: breaking news; see below · · · ······

I decided recently that I wanted to do something pretty similar, and what came out of it was the pqueue_fetcher module. It ended up being mainly a learning endeavour: I learned, among other things, that it's almost certainly better to use something like twisted than to try to kill Python threads with any sort of reliability.

That being said, there's code in that module that more or less answers your question. It basically consists of a class whose objects can be set up to get locations from a priority queue and feed them into a fetch function that's supplied at object instantiation. If the location's resources get successfully received before their thread is killed, they get forwarded on to the results queue; otherwise they're returned to the locations queue with a downgraded priority. Success is determined by a passed-in function that defaults to bool.

Along the way I ended up creating the terminable_thread module, which just packages the most mature variation I could find of the code you linked to as InterruptableThread. It also adds a fix for 64-bit machines, which I needed in order to use that code on my ubuntu box. terminable_thread is a dependency of pqueue_fetcher.

Probably the biggest stumbling block I hit is that raising an asynchronous exception as do terminable_thread, and the InterruptableThread you mentioned, can have some weird results. In the test suite for pqueue_fetcher, the fetch function blocks by calling time.sleep. I found that if a thread is terminate()d while so blocking, and the sleep call is the last (or not even the last) statement in a nested try block, execution will actually bounce to the except clause of the outer try block, even if the inner one has an except matching the raised exception. I'm still sort of shaking my head in disbelief, but there's a test case in pqueue_fetcher that reenacts this. I believe "leaky abstraction" is the correct term here.

I wrote a hacky workaround that just does some random thing (in this case getting a value from a generator) to break up the "atomicity" (not sure if that's actually what it is) of that part of the code. This workaround can be overridden via the fission parameter to pqueue_fetcher.Fetcher. It (i.e. the default one) seems to work, but certainly not in any way that I would consider particularly reliable or portable.

So my call after discovering this interesting piece of data was to heretofore avoid using this technique (i.e. calling ctypes.pythonapi.PyThreadState_SetAsyncExc) altogether.

In any case, this still won't work if you need to guarantee that any request whose entire data set has been received (and i.e. acknowledged to the server) gets forwarded on to results. In order to be sure of that, you have to guarantee that the bit that does that last network transaction and the forwarding is guarded from being interrupted, without guarding the entire retrieval operation from being interrupted (since this would prevent timeouts from working..). And in order to do that you need to basically rewrite the retrieval operation (i.e. the socket code) to be aware of whichever exception you're going to raise with terminable_thread.Thread.raise_exc.

I've yet to learn twisted, but being the Premier Python Asynchronous Networking Framework©™®, I expect it must have some elegant or at least workable way of dealing with such details. I'm hoping it provides a parallel way to implement fetching from non-network sources (e.g. a local filestore, or a DB, or an etc.), since I'd like to build an app that can glean data from a variety of sources in a medium-agnostic way.

Anyhow, if you're still intent on trying to work out a way to manage the threads yourself, you can perhaps learn from my efforts. Hope this helps.

· · · · ······ this just in:

I've realized that the tests that I thought had stabilized have actually not, and are giving inconsistent results. This appears to be related to the issues mentioned above with exception handling and the use of the fission function. I'm not really sure what's going on with it, and don't plan to investigate in the immediate future unless I end up having a need to actually do things this way.

intuited