views:

150

answers:

2

Let's assume I'm stuck using Python 2.6, and can't upgrade (even if that would help). I've written a program that uses the Queue class. My producer is a simple directory listing. My consumer threads pull a file from the queue, and do stuff with it. If the file has already been processed, I skip it. The processed list is generated before all of the threads are started, so it isn't empty.

Here's some pseudo-code.

import Queue, sys, threading

processed = []

def consumer():
    while True:
        file = dirlist.get(block=True)
        if file in processed:
            print "Ignoring %s" % file
        else:
            # do stuff here
        dirlist.task_done()

dirlist = Queue.Queue()

for f in os.listdir("/some/dir"):
    dirlist.put(f)

max_threads = 8

for i in range(max_threads):
    thr = Thread(target=consumer)
    thr.start()

dirlist.join()

The strange behavior I'm getting is that if a thread encounters a file that's already been processed, the thread stalls out and waits until the entire program ends. I've done a little bit of testing, and the first 7 threads (assuming 8 is the max) stop, while the 8th thread keeps processing, one file at a time. But, by doing that, I'm losing the entire reason for threading the application.

Am I doing something wrong, or is this the expected behavior of the Queue/threading classes in Python 2.6?

+1  A: 

I tried running your code, and did not see the behavior you describe. However, the program never exits. I recommend changing the .get() call as follows:

    try:
        file = dirlist.get(True, 1)
    except Queue.Empty:
        return

If you want to know which thread is currently executing, you can import the thread module and print thread.get_ident().

I added the following line after the .get():

    print file, thread.get_ident()

and got the following output:

bin 7116328
cygdrive 7116328
 cygwin.bat 7149424
cygwin.ico 7116328
 dev etc7598568
7149424
 fix 7331000
 home 7116328lib
 7598568sbin
 7149424Thumbs.db
 7331000
tmp 7107008
 usr 7116328
var 7598568proc
 7441800

The output is messy because the threads are writing to stdout at the same time. The variety of thread identifiers further confirms that all of the threads are running.

Perhaps something is wrong in the real code or your test methodology, but not in the code you posted?

Daniel Stutzbach
Ah, that explains why I had to keep killing the program. Thanks for the tip.
Pat
@voipme Thanks is nice. Up-voting is better. :-)
Daniel Stutzbach
+1  A: 

Since this problem only manifests itself when finding a file that's already been processed, it seems like this is something to do with the processed list itself. Have you tried implementing a simple lock? For example:

processed = []
processed_lock = threading.Lock()

def consumer():
    while True:
        with processed_lock.acquire():
            fileInList = file in processed
        if fileInList:
            # ... et cetera

Threading tends to cause the strangest bugs, even if they seem like they "shouldn't" happen. Using locks on shared variables is the first step to make sure you don't end up with some kind of race condition that could cause threads to deadlock.


Of course, if what you're doing under # do stuff here is CPU-intensive, then Python will only run code from one thread at a time anyway, due to the Global Interpreter Lock. In that case, you may want to switch to the multiprocessing module - it's very similar to threading, though you will need to replace shared variables with another solution (see here for details).

Daniel G
I hadn't thought of putting a lock on the processed list. Now that I think of it, it definitely makes sense. And thanks for the recommendation to use the multiprocessing module.
Pat