views:

78

answers:

1

I'm having some problems with launching threads from a list of functions. They are in a list because they are configuration-specific functions. I'm wrappering the functions so that I can store the results of the functions in 'self', but something is going wrong in a non-threadsafe way that I get the right number of threads started, but some instances aren't the right function. Here's the example code:

import threading, time

class runParallelTest():
    def __init__(self):
        pass

    def runList(self, functionList):
        threadList = []
        for functionListIndex in range(0, len(functionList)):
            newThread = threading.Thread(target=lambda:self._run_parallel_job(functionList[functionListIndex]))
            newThread.start()
            threadList.append(newThread)
            # sleep delay that makes it all work fine.
            #time.sleep(2)

        # We wait for all the threads to complete and if any of them
        # doesn't we report a failure.
        for thread in threadList:
            thread.join(3600*24) # 1 day better be enough
            if thread.isAlive() == True:
                raise Exception("thread.isAlive==True")

    def _run_parallel_job(self, function):
        results = function()
        # store the results in a threadsafe way in self
        # (I promise I'm using semaphores)

def f(x):
    print "f(%d) run" % x
    return x

if __name__ == '__main__':
    rp = runParallelTest()

    functionList = [
        lambda:f(0),
        lambda:f(1),
        lambda:f(2),
        lambda:f(3),
        lambda:f(4),
        lambda:f(5),
        lambda:f(6),
        lambda:f(7),
        ]

    rp.runList(functionList)

When I run, I see things like this:

> python thread_problem.py
f(0) run
 f(1) run
f(2) run
 f(4) run
f(5) run
f(5) run
f(6) run
f(7) run
>

While I expect different orders in the prints, I think I should see the numbers 0-7 with no repeats, but I don't. If I add the time.sleep(2), the problem magically goes away, but I'd really like to understand why it doesn't work the way I think it should.

Thanks a bunch!

+2  A: 

The problem is that functionList[functionListIndex] is evaluated only when the lambda it is in is run (within the thread). By then the value of functionListIndex can change.

To fix this, you can pass a parameter to the lambda that will be evaluated at definition time:

newThread = threading.Thread(target=lambda func=functionList[functionListIndex]: self._run_parallel_job(func))

Since default parameter values to functions are evaluated at definition time, this will work.

A more Pythonic solution is to avoid the lambda and use the args parameter:

newThread = threading.Thread(target=self._run_parallel_job, args=(functionList[functionListIndex],))
interjay
Ah! Now I understand. Thanks for the help.
Mike Miller