views:

154

answers:

4

I have a number of threads which wait on an event, perform some action, then wait on the event again. Another thread will trigger the event when it's appropriate.

I can't figure out a way to ensure that each waiting thread triggers exactly once upon the event being set. I currently have the triggering thread set it, sleep for a bit, then clear it. Unfortunately, this leads to the waiting threads grabbing the set event many times, or none at all.

I can't simply have the triggering thread spawn the response threads to run them once because they're responses to requests made from elsewhere.

In short: In Python, how can I have a thread set an event and ensure each waiting thread acts on the event exactly once before it gets cleared?

Update:

I've tried setting it up using a lock and a queue, but it doesn't work. Here's what I have:

# Globals - used to synch threads
waitingOnEvent = Queue.Queue
MainEvent = threading.Event()
MainEvent.clear()    # Not sure this is necessary, but figured I'd be safe
mainLock = threading.Lock()

def waitCall():
    mainLock.acquire()
    waitingOnEvent.put("waiting")
    mainLock.release()
    MainEvent.wait()
    waitingOnEvent.get(False)
    waitingOnEvent.task_done()
    #do stuff
    return

def triggerCall():
    mainLock.acquire()
    itemsinq = waitingOnEvent.qsize()
    MainEvent.set()
    waitingOnEvent.join()
    MainEvent.clear()
    mainLock.release()
    return

The first time, itemsinq properly reflects how many calls are waiting, but only the first waiting thread to make the call will make it through. From then on, itemsinq is always 1, and the waiting threads take turns; each time the trigger call happens, the next goes through.

Update 2 It appears as though only one of the event.wait() threads is waking up, and yet the queue.join() is working. This suggests to me that one waiting thread wakes up, grabs from the queue and calls task_done(), and that single get()/task_done() somehow empties the queue and allows the join(). The trigger thread then completes the join(), clears the event, and thus prevents the other waiting threads from going through. Why would the queue register as empty/finished after only one get/task_done call, though?

Only one seems to be waking up, even if I comment out the queue.get() and queue.task_done() and hang the trigger so it can't clear the event.

+1  A: 

I'm not a python programmer but if an event can only be processed once perhaps you need to switch to a message queue with the appropriate locking so that when one thread wakes up and receives the event message it will process it and remove it from the queue so its not there if other threads wake up and look in the queue.

bot403
+2  A: 

One solution I've used in the past is the Queue class for interthread communication. It is threadsafe and can be used to easy communication between threads when using both the multiprocessing and threading libraries. You could have the child threads waiting for something to enter the queue and then process the new entry. The Queue class also has a get() method which takes a handy blocking argument.

krs1
+2  A: 

If you want discrete, atomic events that can be processed sequentially by each thread, then do as krs1 & bot403 suggested and use a queue. The Python Queue class is synchronized - you do not have to worry about locking to use it.

If however your needs are simpler (the event tells you that you have data available to read, etc), you can subscribe/register your threads as observers of an object responsible for triggering the events. This object would maintain the list of observer threading.Event objects. On a trigger, it can then call set() on all of the threading.Event objects in the list.

Jeremy Brown
+1  A: 

You don't need an Event, and you don't need both a Lock and a Queue. All you need is a Queue.

Call queue.put to drop a message in without waiting for it to be delivered or processed.

Call queue.get in the worker thread to wait for a message to arrive.

import threading
import Queue

active_queues = []

class Worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            data = self.mailbox.get()
            if data == 'shutdown':
                print self, 'shutting down'
                return
            print self, 'received a message:', data

    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")
        self.join()


def broadcast_event(data):
    for q in active_queues:
        q.put(data)

t1 = Worker()
t2 = Worker()
t1.start()
t2.start()
broadcast_event("first event")
broadcast_event("second event")
broadcast_event("shutdown")

t1.stop()
t2.stop()

The messages don't have to be strings; they can be any Python object.

Jason Orendorff
This assumes that there's a static number of waiting threads all defined at the start, which is not the case. The waiting threads are generated by requests, so the number is dynamic and unknown. The lock was intended to keep additional waiting threads from adding themselves, seeing the event set, removing themselves, and going through - especially since this could, theoretically, keep the queue from ever emptying.
culhantr
@culhantr OK, I rewrote the example.
Jason Orendorff