views:

286

answers:

2

How can I implement conditional lock in threaded application, for instance I haw 30 threads that are calling function and for most off the time all threads can access is simultaneous, but depending on function input there can be condition when only one thread can do that one thing. (If value for input is repeated and some thread is still working then I need lock.)

I now that there is module threading with Rlock() but I don't now how to use it in a way that i described it in first part.

Edit: The question is actually about how to prevent any two threads from running the same function with the same argument at the same time. (Thanks to David for helping me formulate my question :) )

+5  A: 

Try this: have a lock in the module where your function is, and if the input to the function is such that locking is required, acquire the lock inside the function. Otherwise don't.

l = threading.RLock()

def fn(arg):
    if arg == arg_that_needs_lock:
        l.acquire()
        try:
            # do stuff
        finally:
            l.release()
    else:
        # do other stuff


EDIT:

As far as I can tell now, the question is actually about how to prevent any two threads from running the same function with the same argument at the same time. There's no problem with two threads running the same function with different arguments at the same time, though. The simple method to do this, if all valid arguments to the function can be dictionary keys, is to create a dictionary of arguments to locks:

import threading

dict_lock = threading.RLock()
locks = {}

def fn_dict(arg):
    dict_lock.acquire()
    try:
        if arg not in dict:
            locks[arg] = threading.RLock()
        l = locks[arg]
    finally:
        dict_lock.release()
    l.acquire()
    try:
        # do stuff
    finally:
        l.release()

If your function can be called with many different arguments, though, that amounts to a lot of locks. Probably a better way is to have a set of all arguments with which the function is currently executing, and have the contents of that set protected by a lock. I think this should work:

set_condition = threading.Condition()
current_args = set()

def fn_set(arg):
    set_condition.acquire()
    try:
        while arg in current_args:
            set_condition.wait()
        current_args.add(arg)
    finally:
        set_condition.release()
    # do stuff
    set_condition.acquire()
    try:
        current_args.remove(arg)
        set_condition.notifyAll()
    finally:
        set_condition.release()
David Zaslavsky
only problem that I don't haw some preconditioned that needs lock, I need lock only if some other thread is for instance using same value for argument
Ib33X
"for instance" you say... you need to be more specific about that to get an answer. Otherwise we're only guessing about what your question means - see if you can edit it to be clearer.
David Zaslavsky
in python 2.4 there is no with statement any alternative?
Ib33X
@lb33X: For older python's, you can replace the "with l:" statements with l.acquire();try: <body of block>; finally: l.release() - they are effectively just wrapping the region where the lock is held.
Brian
Yep, I've edited that in.
David Zaslavsky
+1  A: 

It sounds like you want something similar to a Readers-Writer lock.

This is probably not what you want, but might be a clue:

from __future__ import with_statement
import threading

def RWLock(readers = 1, writers = 1):
    m = _Monitor(readers, writers)
    return (_RWLock(m.r_increment, m.r_decrement), _RWLock(m.w_increment, m.w_decrement))

class _RWLock(object):
    def __init__(self, inc, dec):
        self.inc = inc
        self.dec = dec

    def acquire(self):
        self.inc()
    def release(self):
        self.dec()
    def __enter__(self):
        self.inc()
    def __exit__(self):
        self.dec()

class _Monitor(object):
    def __init__(self, max_readers, max_writers):
        self.max_readers = max_readers
        self.max_writers = max_writers
        self.readers = 0
        self.writers = 0
        self.monitor = threading.Condition()

    def r_increment(self):
        with self.monitor:
            while self.writers > 0 and self.readers < self.max_readers:
                self.monitor.wait()
            self.readers += 1
            self.monitor.notify()

    def r_decrement(self):
        with self.monitor:
            while self.writers > 0:
                self.monitor.wait()
            assert(self.readers > 0)
            self.readers -= 1
            self.monitor.notify()

    def w_increment(self):
        with self.monitor:
            while self.readers > 0 and self.writers < self.max_writers:
                self.monitor.wait()
            self.writers += 1
            self.monitor.notify()

    def w_decrement(self):
        with self.monitor:
            assert(self.writers > 0)
            self.writers -= 1
            self.monitor.notify()

if __name__ == '__main__':

    rl, wl = RWLock()
    wl.acquire()
    wl.release()
    rl.acquire()
    rl.release()

(Unfortunately not tested)

Henrik Gustafsson