views:

479

answers:

7

I have a Subject which offers Subscribe(Observer*) and Unsubscribe(Observer*) to clients. Subject runs in its own thread (from which it calls Notify() on subscribed Observers) and a mutex protects its internal list of Observers.

I would like client code - which I don't control - to be able to safely delete an Observer after it is unsubscribed. How can this be achieved?

  • Holding the mutex - even a recursive mutex - while I notify observers isn't an option because of the deadlock risk.
  • I could mark an observer for removal in the Unsubscribe call and remove it from the Subject thread. Then clients could wait for a special 'Safe to delete' notification. This looks safe, but is onerous for clients.

Edit

Some illustrative code follows. The problem is how to prevent Unsubscribe happening while Run is at the 'Problem here' comment. Then I could call back on a deleted object. Alternatively, if I hold the mutex throughout rather than making the copy, I can deadlock certain clients.

#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.insert(o);
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            set<Observer*> notifyList;
            {
                mutex::scoped_lock l(m);
                notifyList = observers;
            }
            // Problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&Observer::Notify));
        }
    }

private:
    set<Observer*> observers;
    thread t;
    mutex m;
};

Edit

I can't Notify observers while holding the mutex because of the deadlock risk. The most obvious way this can happen - the client calls Subscribe or Unsubscribe from inside Notify - is easily remedied by making the mutex recursive. More insidious is the risk of intermittent deadlock on different threads.

I'm in a multithreaded environment, so at any point in a thread's execution, it will typically hold a sequence of locks L1, L2, ... Ln. Another thread will hold locks K1, K2, ... Km. A properly written client will ensure that different threads will always acquire locks in the same order. But when clients interact with my Subject's mutex - call it X - this strategy will be broken: Calls to Subscribe / Unsubscribe acquire locks in the order L1, L2, ... Ln, X. Calls to Notify from my Subject thread acquire locks in the order X, K1, K2, ... Km. If any of the Li or Kj can coincide down any call path, the client suffers an intermittent deadlock, with little prospect of debugging it. Since I don't control the client code, I can't do this.

+2  A: 

Can you change the signature of Subscribe() an Unsubscribe()? Replacing the Observer* with something like shared_ptr<Observer> would make things easier.

EDIT: Replaced "easy" by "easier" above. For an example of how this is difficult to "get right", see the history of the Boost.Signals and of the adopted-but-not-yet-in-the-distribution Boost.Signals2 (formerly Boost.ThreadSafeSignals) libraries.

Éric Malenfant
I thought about this - I could even hold weak_ptr in the Subject, and clients wouldn't even need to unsubscribe. Unfortunately, I can't change this interface, but I will definitely do this in future.
fizzer
timday
While this solves the "who deletes who" resource problem, without solving the synchronization problem you can still run into the situation where a notification happens on an observer that has already been unsubscribed - maybe thats not a problem.
Greg Rogers
+1  A: 

Mmm... I don't really understand your question, because if a client calls Unsubscribe you should be able to let the client delete it (it's not used by you). However, if for some reason you cannot close the relationship once the client unsubscribes the observer, you could add "Subject" a new operation to safely delete an Observer, or just for the clients to signal that they are not interested in an Observer any more.

Rethink edit: OK, now I think I understand what's your problem. I think the best solution to your problem is doing the following:

  1. Have each stored observer element to have a "valid" flag. This flag will be used to notify it or not while you're in the notification loop.
  2. You need a mutex to protect the access to that "valid" flag. Then, the unsubscribe operation locks the mutex for the "valid" flag, sets it to false for the selected observer.
  3. The notification loop also has to lock and unlock the mutex of the valid flag, and only act upon observers that are "valid".

Given that the unsubscribe operation will block on the mutex to reset the valid flag (and that that particular Observer won't be used any more in your thread), the code is thread safe, and clients can delete any observer as soon as unsubscribe has returned.

Diego Sevilla
+1  A: 

Would something like this be satisfactory? It still isn't safe to unsubscribe an observer while being notified though, for that you would need an interface like you mentioned (as far as I can tell).

Subscribe(Observer *x)
{
    mutex.lock();
    // add x to the list
    mutex.unlock();
}

Unsubscribe(Observer *x)
{
    mutex.lock();
    while (!ok_to_delete)
        cond.wait(mutex);
    // remove x from list
    mutex.unlock();
}

NotifyLoop()
{
    while (true) {
        // wait for something to trigger a notify

        mutex.lock();
        ok_to_delete = false;
        // build a list of observers to notify
        mutex.unlock();

        // notify all observers from the list saved earlier

        mutex.lock();
        ok_to_delete = true;
        cond.notify_all();
        mutex.unlock();
    }
}

If you want to be able to Unsubscribe() inside Notify() - (a bad design decision on the client IMO...) you can add the thread id of the notifier thread into your data structure. In the Unsubscribe function you can check that thread id against the current thread's id (most threading libraries provide this - eg. pthread_self). If they are the same, you can proceed without waiting on the condition variable.

NOTE: If the client is responsible for deleting the observer, this means you run into the situation where inside the Notify callback, you will have unsubscribed and deleted the observer, but are still executing something with that junked this pointer. It is something the client will have to be aware of and to only delete it at the end of the Notify().

Greg Rogers
But would'nt that deadlock if an Observer Unsubscribe()s itself from its Notify callback?
Éric Malenfant
+1 as it addresses the crux of the problem. I'd like clients to be able to subscribe from callbacks, though, so I'll need a bit more thinking. It's scary that there's not a well-known idiom for this problem. I see this pattern every day, always broken.
fizzer
fizzer - subscribing from callbacks is fine. You don't need to ever wait on the condition variable when subscribing, only the mutex (which is explicitly release before calling Notify(). It is only unsubscribing that is the problem.
Greg Rogers
Sorry, I posted the comment last thing at night. I'd like Unsubscribe to work from callbacks. Unsubscribing in response to an event is a very plausible thing for a client to try.
fizzer
Apologies for the delay in responding. I think it's almost there, except that the condition needs to prevent the client from returning (and potentially calling delete). So ok_to_delete needs to be false while in the Notify loop, and the client needs to wait for it *after* 'remove x from list'
fizzer
It doesn't really matter if you wait on the condition before or after you remove them from the list in Unsubscribe. You can still potentially be executing something in Notify with the thing you are removing, and it still doesn't really matter. In fact it may be better to do it this way -
Greg Rogers
- because there is a lot smaller chance that you call unsubscribe, and then receive an event, and then are notified still (note Unsubscribe still hasn't returned). The *Gotcha* is still Unsubscribing and deleting that pointer inside Notify().
Greg Rogers
+1  A: 

Rather than have clients get a "SafeToDelete" notification, provide them with an IsSubscribed( Observer *) method. The client code then becomes:

subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) {
   sleep_some_short_time;   // OS specific sleep stuff
}
delete observer;

which is not too onerous.

anon
If IsSubscribed() simply looks in the set of observers, it's still broken. The whole thing could complete while the Subject thread is asleep at the 'Problem here' comment in the example code I've posted.
fizzer
+4  A: 

Unsubscribe() should be synchronous, so that it does not return until Observer is guaranteed not to be in Subject's list anymore. That's the only way to do it safely.

ETA (moving my comment to the answer):

Since time doesn't seem to be an issue, take and release the mutex between notifying each observer. You won't be able to use for_each the way you are now, and you'll have to check the iterator to ensure that it's still valid.

for ( ... )
{
    take mutex
    check iterator validity
    notify
    release mutex
}

That will do what you want.

Rob K
I like that solution. That sounds good.
MarkR
Very good. If only there were some way to code it...
fizzer
Since time doesn't seem to be an issue, take and release the mutex between notifying each observer.
Rob K
I want to avoid holding the mutex while calling client code because of the deadlock risk.
fizzer
The client code you call should execute in the context of your thread, so it shouldn't be an issue. If it tries to take the same mutex, since it's the same thread it will succeed.
Rob K
You can't know if the iterator is valid or not without knowing what every other thread is doing. Once you lose exclusive access to the container you have to be pessimistic and assume all iterators and references to elements in it are invalid.
Greg Rogers
I think that one can trust that if the pointer to observer is still in the observers list, that it's usable. So the only thing to check, once you've taken the mutex is if your current iterator is still good e.g. the observer* is still in the list.
Rob K
Have you looked at a hand-over-hand locking mechanism for controlling access to the observer list?
Ian Hickman
+1  A: 

You could create a "to-delete queue" in the CSubject type. When you remove the the Observer, you could call pSubject->QueueForDelete(pObserver). Then when the subject thread is between notifications, it could safely delete observers from the queue.

m-sharp
A: 

I think this does the trick if not very elegantly:

class Subject {
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0)    {    }
void Subscribe(Observer* o) {
    mutex::scoped_lock l(m);
    InternalObserver io( o );
    boost::shared_ptr<InternalObserver> sp(&io);
    observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
}

void Unsubscribe(Observer* o) {
    mutex::scoped_lock l(m);
    observers.find( MakeKey(o) )->second->exists = false;    }

void WaitForSomethingInterestingToHappen() {}
void Run()
{
    for (;;)
    {
        WaitForSomethingInterestingToHappen();
        for( unsigned int i = 0; i < observers.size(); ++ i )
        {
            mutex::scoped_lock l(m);
            if( observers[i]->exists )
            {
                mem_fun(&Observer::Notify);//needs changing
            }
            else
            {
                observers.erase(i);
                --i;
            }
        }
    }
}
private:

int MakeKey(Observer* o) {
    return ++m_key;//needs changeing, sha of the object?
}
class InternalObserver {
public:
    InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
    Observer* m_o;
    bool exists;
};

map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
};
Patrick