views:

727

answers:

9

Hello, I am opening a file which has 100,000 url's. I need to send an http request to each url and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.

Thank you, Igor

+1  A: 

The easiest way would be to use Python's built-in threading library. They're not "real" / kernel threads, but are good enough. You'd want a queue & thread pool. One option is here, but it's trivial to write your own. You can't parallelize all 100,000 calls, but you can fire off 100 (or so) of them at the same time.

Pestilence
Python's threads are quite real, as opposed to Ruby's for instance. Under the hood they are implemented as native OS threads, at least on Unix/Linux and Windows. Maybe you're referring to the GIL, but it doesn't make the threads less real...
Eli Bendersky
Eli is right about Python's threads, but Pestilence's point that you'd want to use a thread pool is correct, too. The last thing that you'd want to do in this case is try to start a separate thread for each of the 100K requests simultaneously.
Adam Crossland
Igor, you can't sensibly post code snippets in comments, but you can edit your question and add them there.
Adam Crossland
Pestilence: how many queues and threads-per-queue would you recommend for my solution?
PythonUser
A: 

Using a thread pool is a good option, and will make this fairly easy. Unfortunately, python doesn't have a standard library that makes thread pools ultra easy. But here is a decent library that should get you started: http://www.chrisarndt.de/projects/threadpool/

Code example from their site:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

Hope this helps.

Kevin Wiskia
I suggest that you specify q_size for ThreadPool like this: ThreadPool(poolsize, q_size=1000)So that you won't have 100000 WorkRequest objects in memory."If `q_size`>0 the size of the work **request queue** is limited and the thread pool blocks when the queue is full and it tries to put more work requests in it (see `putRequest` method), unless you also use a positive `timeout` value for `putRequest`."
Kalmi
So far I'm trying to implement the threadpool solution - as suggested. However, I don't understand the parameter list in the makeRequests function. What is some_callable, list_of_args, callback? Perhaps if I saw a real code snippet that would help. I'm surprised that the author of that library didn't post ANY examples.
PythonUser
some_callable is your function that all your work is done in (connecting to the http server).list_of_args is arguments that will be passed into some_callabe.callback is a function that will be called when the worker thread is done. It takes two arguments, the worker object (don't need to concern your self with this really), and the results that the worker retrieved.
Kevin Wiskia
Totally untested code example: http://dakman.pastebin.com/eTUnQzmk
Kevin Wiskia
+1  A: 

For your case, threading will probably do the trick as you'll probably be spending most time waiting for a response. There are helpful modules like Queue in the standard library that might help.

I did a similar thing with parallel downloading of files before and it was good enough for me, but it wasn't on the scale you are talking about.

If your task was more CPU-bound, you might want to look at the multiprocessing module, which will allow you to utilize more CPUs/cores/threads (more processes that won't block each other since the locking is per process)

Mattias Nilsson
Mattias, thanks. I did just that in my solution so far.
PythonUser
The only thing I'd like to mention is that spawning multiple processes may be more expensive than spawning multiple threads. Also, there is no clear performance gain in sending out 100,000 HTTP requests with multiple processes vs. multiple threads.
PythonUser
+3  A: 

A good approach to solving this problem is to first write the code required to get one result, then incorporate threading code to parallelize the application.

In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).

You can follow this design pattern to resolve the above issue:

  1. Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished.
  2. Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a list or dict in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.

I would suggest you use the threading module. You can use it to launch and track running threads. Python's threading support is bare, but the description of your problem suggests that it is completely sufficient for your needs.

Finally, if you'd like to see a pretty straightforward application of a parallel network application written in Python, check out ssh.py. It's a small library which uses Python threading to parallelize many SSH connections. The design is close enough to your requirements that you may find it to be a good resource.

Erik Garrison
erikg: would throwing in a queue into your equation be reasonable (for mutual-exclusion locking)? I suspect that Python's GIL isn't geared toward playing with thousands of threads.
PythonUser
Why do you need mutual-exclusion locking to prevent the generation of too many threads? I suspect I misunderstand the term.You can track running threads in a thread queue, removing them when they complete and adding more up to said thread limit. But in a simple case such as the one in question you can also just watch the number of active threads in the current Python process, wait until it falls below a threshold, and launch more threads up to the threshold as described. I guess you could consider this an implicit lock, but no explicit locks are required afaik.
Erik Garrison
erikg: don't multiple threads share state? On page 305 in O'Reilly's book "Python for Unix and Linux System Administration" it states:"... using threading without queues makes it more complex than many people can realistically handle. It is a much better idea to always use the queuing module if you find you need to use threads. Why? Because the queue module also alleviates the need to explicitlyprotect data with mutexes because the queue itself is already protected internally by a mutex."Again, I welcome your point of view on this.
PythonUser
Igor: You are absolutely right that you should use a lock. I've edited the post to reflect this. That said, practical experience with python suggests that you don't need to lock data structures which you modify atomically from your threads, such as by list.append or by the addition of a hash key. The reason, I believe, is the GIL, which provides operations such as list.append with a degree of atomicity. I am currently running a test to verify this (use 10k threads to append numbers 0-9999 to a list, check that all appends worked). After nearly 100 iterations the test has not failed.
Erik Garrison
Igor: I'm asked another question on this topic: http://stackoverflow.com/questions/2740435/are-there-some-cases-where-python-threads-can-safely-manipulate-shared-state
Erik Garrison
Erik G: You are right about the GIL, it enforces atomicity and integrity of shared data. In my case, I am reading 100,000 url's from a file, spawning as many threads as possible to send http requests to those url's, and putting the response codes into a data structure (finally writing results to a file). So the only benefit I see to using a queue in my case it its join() method, which prevents the program from exiting before all queue items are processed. Is there another benefit to using a queue that I am not seeing?
PythonUser
Igor: You can retain references to all the launched threads in a list, and then iterate over the threads, calling `join()` on each of them at the end of your program. (I provide an example in the other question I reference in these comments.) This will have the same effect. I think that the difference between the approaches in this case is mostly stylistic. I would find the non-queue case simpler and faster to write, but I might have more confidence in the long-term viability of the code I wrote using queues for thread management.
Erik Garrison
+1  A: 

If you're looking to get the best performance possible, you might want to consider using Asynchronous I/O rather than threads. The overhead associated with thousands of OS threads is non-trivial and the context switching within the Python interpreter adds even more on top of it. Threading will certainly get the job done but I suspect that an asynchronous route will provide better overall performance.

Specifically, I'd suggest the async web client in the Twisted library (http://www.twistedmatrix.com). It has an admittedly steep learning curve but it quite easy to use once you get a good handle on Twisted's style of asynchronous programming.

A HowTo on Twisted's asynchronous web client API is available at:

http://twistedmatrix.com/documents/current/web/howto/client.html

Rakis
Rakis: I am currently looking into asynchronous and non-blocking I/O. I need to learn it better before I implement it. One comment I'd like to make on your post is that it is impossible (at least under my Linux distribution) to spawn "thousands of OS threads". There is a maximum number of threads that Python will allow you to spawn before the program breaks. And in my case (on CentOS 5) maximum number of threads is 303.
PythonUser
That's good to know. I've never tried spawning more than a handful in Python at once but I would have expected to be able to create more than that before it bombed.
Rakis
+8  A: 

Threads are absolutely not the answer here. They will provide both process and kernel bottlenecks, as well as throughput limits that are not acceptable if the overall goal is "the fastest way".

A little bit of twisted and its asynchronous HTTP client would give you much better results.

ironfroggy
ironfroggy: I am leaning toward your sentiments. I tried implementing my solution with threads and queues (for automatic mutexes), but can you imagine how long it takes to populate a queue with 100,000 things?? I'm still playing around with different options and suggestions by everyone on this thread, and maybe Twisted will be a good solution.
PythonUser
You can avoid populating a queue with 100k things. Just process items one at a time from your input, then launch a thread to process the request corresponding to each item. (As I describe below, use a launcher thread to start the HTTP request threads when your thread count is below some threshold. Make the threads write the results out into a dict mapping URL to response, or append tuples to a list.)
Erik Garrison
ironfroggy: Also, I'm curious about what bottlenecks you've found using Python threads? And how do Python threads interact with the OS kernel?
Erik Garrison
Make sure you install the epoll reactor; otherwise you'll be using select/poll, and it will be very slow.Also, if you're going to actually try to have 100,000 connections open simultaneously (assuming your program is written that way, and the URLs are on different servers), you'll need to tune your OS so that you won't run out of file descriptors, ephemeral ports, etc. (it's probably easier to just make sure that you don't have more than, say, 10,000 outstanding connections at once).
Mark Nottingham
erikg:you did recommend a great idea. However, the best result I was able to achieve with 200 threads was approx. 6 minutes. I'm sure there are ways to accomplish this in lesser time...Mark N: if Twisted is the way I decide to go, then epoll reactor is surely useful. However, if my script will be run from multiple machines, wouldn't that necessitate the installation of Twisted on EACH machine? I don't know if I can convince my boss to go that route...
PythonUser
A: 

Consider using Windmill , although Windmill probably cant do that many threads.

You could do it with a hand rolled Python script on 5 machines, each one connecting outbound using ports 40000-60000, opening 100,000 port connections.

Also, it might help to do a sample test with a nicely threaded QA app such as OpenSTA in order to get an idea of how much each server can handle.

Also, try looking into just using simple Perl with the LWP::ConnCache class. You'll probably get more performance (more connections) that way.

djangofan
+1  A: 

A solution:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Kalmi
I'm basicly using twisted as a threadpool... duh...
Kalmi
Using Twisted as a threadpool is ignoring most of the benefits you can get from it. You should be using the async HTTP client instead.
Jean-Paul Calderone
+3  A: 

Twistedless solution:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url=q.get()
        status,url=getStatus(url)
        doSomethingWithResult(status,url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q=Queue(concurrent*2)
for i in range(concurrent):
    t=Thread(target=doWork)
    t.daemon=True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

This one is slighty faster than the twisted solution and uses less cpu.

Kalmi
Kalmi, your solution is pretty good. The timed results on running your program with a file of 100,000 url's was:real 5m23.863suser 1m28.177ssys 2m34.299sHowever, one question I have is:isn't populating the queue with each url reduntant and adds overhead? Why not just spawn the processes from the url's as you are reading them from the file (without using a queue)?
PythonUser
Well... This is basically a simple threadpool implementation. It ensures that there are no more than 200 jobs running at the same time. And I know no way to implement a threadpool without using something queuelike. And yes, you do need a threadpool. I'm pretty sure you want to be able to control the number of requests that can happen at the same time.
Kalmi
Kalmi: I wrote up a Python script similar to yours that contains the following: if threading.active_count() > 200: time.sleep(10)This allows outstanding threads to catch up so that the program doesn't crash. So no queue was needed in this implementation.
PythonUser
I believe that using a Queue is less overhead than starting threads. And your solution could stand idling for x amount of time if all the outstanding request finish before the sleep is over.
Kalmi