views:

812

answers:

6

I recently wrote a program that used a simple producer/consumer pattern. It initially had a bug related to improper use of threading.Lock that I eventually fixed. But it made me think whether it's possible to implement producer/consumer pattern in a lockless manner.

Requirements in my case were simple:

  • One producer thread.
  • One consumer thread.
  • Queue has place for only one item.
  • Producer can produce next item before the current one is consumed. The current item is therefore lost, but that's OK for me.
  • Consumer can consume current item before the next one is produced. The current item is therefore consumed twice (or more), but that's OK for me.

So I wrote this:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

My question is: Is this code thread-safe?

Immediate comment: this code isn't really lockless - I use CPython and it has GIL.

I tested the code a little and it seems to work. It translates to some LOAD and STORE ops which are atomic because of GIL. But I also know that del x operation isn't atomic when x implements __del__ method. So if my item has a __del__ method and some nasty scheduling happens, things may break. Or not?

Another question is: What kind of restrictions (for example on produced items' type) do I have to impose to make the above code work fine?

My questions are only about theoretical possibility to exploit CPython's and GIL's quirks in order to come up with lockless (i.e. no locks like threading.Lock explicitly in code) solution.

+1  A: 

This is not really thread safe because producer could overwrite QUEUE_ITEM before consumer has consumed it and consumer could consume QUEUE_ITEM twice. As you mentioned, you're OK with that but most people aren't.

Someone with more knowledge of cpython internals will have to answer you more theoretical questions.

David Locke
Yes, in a way my code is neither threadsafe, nor lockless. :) What I mean by 'threadsafe' here is: doesn't crash, doesn't corrupt memory, doesn't freeze in a deadlock and works as described by my requirements.
Jasiu
I believe that the GIL will protect you from the types of errors that you just mentioned. The GIL is there to keep Python's internal state correct in the face of threads. Your code may not behave the way you expect (but you basically already said that race conditions are fine for what you want), but I don't think it will be unsafe from the interpreter's point of view since the internal state of the interpreter is guarded by the GIL.
Doug
A: 

I think it's possible that a thread is interrupted while producing/consuming, especially if the items are big objects. Edit: this is just a wild guess. I'm no expert.

Also the threads may produce/consume any number of items before the other one starts running.

Bastien Léonard
That's a good point, it brings a possibility I haven't thought about. But I'll try to defend my solution: AFAIK, Python executes each opcode with a signal mask, so that it isn't interrupted and hence atomic. Otherwise things would get nasty, I suppose, and even regular Python stuff wouldn't work multi-threaded.
Jasiu
A: 

You can use a list as the queue as long as you stick to append/pop since both are atomic.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

In a class scope like below, you can even clear the queue.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

You'll have to find out which list operations are atomic by looking at the bytecode generated.

null
I suspect you just moved my question from reading/writing a global variable to appending to/popping from a list, but the question remains: Will my or your code work even if some nasty scheduling, calling __del__ happens?
Jasiu
Calling __del__ explicitly or by calling del? del does not immediately delete it. It just decreases the reference count. As long as consumer holds a reference to it, it's fine.
null
Let's consider following scenario:1. Queue contains many items.2. Consumer calls clear_queue.3. References of items in the queue drop to zero.4. Their __del__ methods are called.5. All of this happens during "self.queue = []" statement.6. Meanwhile producer tries to append another item.You could replace "self.queue = []" with "del self.queue[:]", but that just moves the problem from accessing "self.queue" attribute to Python's internal list operations.So IMHO this again only moves the problem from reading/writing global variable to reading/writing Python's builtin list internals.
Jasiu
Since "self.queue = []" is atleast 2 operations, the creating and the binding to self.queue, there is a chance the producer could access self.queue in the self.queue.append line between those 2 ops. The new item would be lost because it's writing to the old list which will be gone once the next op is executed. The same could happen to consumer and it would consume one item that was meant to be cleared. It's definitely not something I would use where this was critical but it does have it's uses when the guarantee isn't required.
null
+1  A: 

Trickery will bite you. Just use Queue to communicate between threads.

Dustin
Yes, that's what I do! :) I wouldn't use such code in production environment, no way :). It's just a theoretic question :).
Jasiu
A: 

The __del__ could be a problem as You said. It could be avoided, if only there was a way to prevent the garbage collector from invoking the __del__ method on the old object before We finish assigning the new one to the QUEUE_ITEM. We would need something like:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

I'm afraid, I don't know if it is possible, though.

Reef
+2  A: 

Yes this will work in the way that you described:

  1. That the producer may produce a skippable element.
  2. That the consumer may consume the same element.

But I also know that del x operation isn't atomic when x implements del method. So if my item has a del method and some nasty scheduling happens, things may break.

I don't see a "del" here. If a del happens in consume_item then the del may occur in the producer thread. I don't think this would be a "problem".

Don't bother using this though. You will end up using up CPU on pointless polling cycles, and it is not any faster than using a queue with locks since Python already has a global lock.

Unknown
By `__del__` I meant that refrence count of an object may drop to zero and so it's `__del__` method will be called. This could lead to some problems, but if you say it's fine, I hope that's the way things are in CPython.
Jasiu