views:

218

answers:

4

I have a large XML data file (>160M) to process, and it seems like SAX/expat/pulldom parsing is the way to go. I'd like to have a thread that sifts through the nodes and pushes nodes to be processed onto a queue, and then other worker threads pull the next available node off the queue and process it.

I have the following (it should have locks, I know - it will, later)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

The problem is that the body of the while block gets called only once, and then I can't even ctrl-C interrupt it. On smaller files, the output is as expected, but that seems to indicate that the handler only gets called when the document is fully parsed, which seems to defeat the purpose of a SAX parser.

I'm sure it's my own ignorance, but I don't see where I'm making the mistake.

PS: I also tried changing start_handler thus:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

No love, though.

+1  A: 

The only thing I see is wrong is that you are accessing q simultaneously from different threads - no locking as you write indeed. That is asking for trouble - and you're probably getting trouble in the form of the Python interpreter locking up on you. :)

Try locking, it's really not very difficult:

import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

You see, it was really simple, we just created a lock variable to guard our object, and acquire that lock every time before we use the object and release every time after we finished our task on the object. This way we guaranteed that q.append(name) will never overlap with print(q).


(With newer versions of Python there is also a "with .... " syntax that helps you not to release locks or close files or other cleanups one frequently forgets.)

Bandi-T
+5  A: 

I'm not too sure about this problem. I'm guessing the call to ParseFile is blocking and only the parsing thread is being run because of the GIL. A way around this would be to use multiprocessing instead. It's designed to work with queues, anyway.

You make a Process and you can pass it a Queue:

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

I've included an elements list, just to replicate your original script. Your final solution will probably use get_nowait and a Pool or something similar.

Brian McKenna
Yes, this is a good path to go down - as you said you'd want to use queues anyway.
Bandi-T
I tried that code; it avoids the lockup, but ParseFile still doesn't seem to output anything until it's read the whole input.
slide_rule
A: 

I do not know a whole lot about the implementation, but if the parse is C code that executes until complete, other Python threads will not run. If the parser is calling back out to Python code, the GIL may be released for other threads to run, but I am not sure about that. You might want to check those details.

Noctis Skytower
+4  A: 

ParseFile, as you've noticed, just "gulps down" everything -- no good for the incremental parsing you want to do! So, just feed the file to the parser a bit at a time, making sure to conditionally yield control to other threads as you go -- e.g.:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)

the time.sleep(0.0) call is Python's way to say "yield to other threads if any are ready and waiting"; the Parse method is documented here.

The second point is, forget locks for this usage! -- use Queue.Queue instead, it's intrinsically threadsafe and almost invariably the best and simplest way to coordinate multiple threads in Python. Just make a Queue instance q, q.put(name) on it, and have worked threads block on q.get() waiting to get some more work to do -- it's SO simple!

(There are several auxiliary strategies you can use to coordinate the termination of worker threads when there's no more work for them to do, but the simplest, absent special requirements, is to just make them daemon threads, so they will all terminate when the main thread does -- see the docs).

Alex Martelli
Voted up for the Queue suggestions, but are you sure about about ParseFile gulping everything down in one go? It does call back into the Python handlers to handle the tags as it goes, that's the whole purpose of SAX parsing... or are you saying that is not enough to trigger a thread switch in Python?
Bandi-T
If you want SAX, you can use xml.sax, see http://docs.python.org/library/xml.sax.html?highlight=sax#module-xml.sax ; the OP's not using SAX, but rather xml.parsers.expat, a lower-abstraction interface which does **not** impose an incremental strategy (it _supports_ it, but doesn't _impose_ it, leaving it up to the Python code level to pick and choose).
Alex Martelli
slide_rule
Alex Martelli