views:

98

answers:

3

I have the following problem.

I have two classes, in this case A and B, which both own a concurrent_queue. The assumption here is that concurrent_queue is a thread-safe queue, with a blocking push() function. When an object is enqueued in B, it accesses the singleton A and it is queued up in A as well. This has the effect that a whole bunch of B's have little queues with their own objects, and one large queue in A that contains them all. Each instance of B could live in a separate thread.

What I am encountering is that frequently a thread will get pre-empted between the two lines of code in B::foo(), meaning A::mQueue contains the object, but B::mQueue does not yet contain the object.

What I am wondering is how I can ensure that when B::foo() is called that the object is either pushed onto both queues or neither queue. It seems to me that I would have to have a mutex in A that B can get a hold of, and lock A's mutex in B::foo().

Does anyone have any suggestions how I could accomplish this, or how I could restructure my code to accomplish this? I am using the boost::threading library.

Class A
{    
public:
    A& instance(){/* return singleton */}        
    void addToQueue(SomeObject const& obj)
    {
        mQueue.push(obj);
    }        
private:
    concurrent_queue<SomeObject> mQueue;
};

Class B
{
public:
    void foo()
    {
        SomeObject obj;
        //I would like to guarantee that obj is either present in both queues or neither queue
        A::instance().addToQueue(obj);
        mQueue.push(obj);
    }        
private:
    concurrent_queue<SomeObject> mQueue;
};

In my actual implementation, it is not the same object that gets queued up in A and B, rather the A queues up structs that contain pointers to B's, which let me dequeue everything in A and dequeue from all the B's in the same order that they were queued up in, but this should be irrelevant to the question.

A: 

You probably do need some form of mutex in order to guarantee atomicity (relative to the rest of your application). Boost::threading does provide mutex objects iirc, so you may want to look into that.

Amber
+2  A: 

You'll need to atomicize your operation of "adding objects to both queues." You'll need a lock or some other kind of synchronization primitive around your two function calls. Same for removing items from the queues.

boost::mutex looks fit for the job. You'll need a single instance and need it to be accessible from anywhere the queues are modified. Since it will also have the same lifetime as A's queue, I suggest you put it in A. Then modify queue accesses so they look like:

A::instance().lockQueue(); //calls A.mQueueAccessMutex.lock(), probably
    A::instance().addToQueue(obj);
    mQueue.push(obj);
A::instance().unlockQueue();

Or, RAII-style:

{
    LockHolder lh(A::instance().getLock()); //lock called in lh's constructor

    A::instance().addToQueue(obj);
    mQueue.push(obj);

    //unlock called in lh's destructor
}

Note that concurrent_queue will then be redundant since no two threads will be accessing the queue concurrently.

--

And, of course, there's always the very small chance that simply reversing the order you put the items in the queues will solve your problems. :)

aib
A: 

As far as I understood B:foo should guarantee that the object is added in both queues, but afterward access to these queues should be independent.

In this case you should enhance A with some method to directly lock it's queue or which returns the mutex used in this queue (I assume that your concurent_queue is mutex based). Afterwards b::foo should first lock both mutexs, do push, release both mutexs.

And do not forget to handle exceptions, f.e. if failed to add to the second queue should be deleted from the first.

Gregory