views:

126

answers:

6

My app consist of the main-process and two threads, all running concurrently and making use of three fifo-queues:

The fifo-q's are Qmain, Q1 and Q2. Internally the queues each use a counter that is incremented when an item is put into the queue, and decremented when an item is 'get'ed from the queue.

The processing involve two threads,
QMaster, which get from Q1 and Q2, and put into Qmain,
Monitor, which put into Q2,
and the main process, which get from Qmain and put into Q1.

The QMaster-thread loop consecutively checks the counts of Q1 and Q2 and if any items are in the q's, it get's them and puts them into Qmain.

The Monitor-thread loop obtains data from external sources, package it and put it into Q2.

The main-process of the app also runs a loop checking the count of Qmain, and if any items, get's an item from Qmain at each iteration of the loop and process it further. During this processing it occasionally puts an item into Q1 to be processed later (when it is get'ed from Qmain in turn).

The problem:
I've implemented all as described above, and it works for a randomly (short) time and then hangs. I've managed to identify the source of the crashing to happen in the increment/decrement of the count of a fifo-q (it may happen in any of them).

What I've tried:
Using three mutex's: QMAIN_LOCK, Q1_LOCK and Q2_LOCK, which I lock whenever any get/put operation is done on a relevant fifo-q. Result: the app doesn't get going, just hangs.

The main-process must continue running all the time, must not be blocked on a 'read' (named-pipes fail, socketpair fail).

Any advice?
I think I'm not implementing the mutex's properly, how should it be done?
(Any comments on improving the above design also welcome)

[edit] below are the processes and the fifo-q-template:
Where & how in this should I place the mutex's to avoid the problems described above?

main-process:
...
start thread QMaster
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    Q2.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        Q2.put(data)
    }
}

QMaster:
{
    while(1)
    {
        if (Q1.count() > 0)
            Qmain.put(Q1.get());

        if (Q2.count() > 0)
            Qmain.put(Q2.get());
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) { item i=new item(); (... adds to tail...); count++; }
    X* get() { X *d = h.data; (...deletes head ...); count--; return d; }
    clear() {...}
};
A: 

Are you acquiring multiple locks simultaneously? This is generally something you want to avoid. If you must, ensure you are always acquiring the locks in the same order in each thread (this is more restrictive to your concurrency and why you generally want to avoid it).

Other concurrency advice: Are you acquiring the lock prior to reading the queue sizes? If you're using a mutex to protect the queues, then your queue implementation isn't concurrent and you probably need to acquire the lock before reading the queue size.

ceretullis
I do a get-put in a single statement (e.g.: Qmain.put(Q2.get()) ) and put the two locks before and unlocks after.
slashmais
@slashmais: that must not be done: always lock 1 queue to get a message (locally), unlock that queue and then lock the other to put the message. If the other thread does the same thing in the opposite order, then you will have a deadlock at some point.
stefaanv
How to do it otherwise then?
slashmais
@slashmais: do it the way I proposed, which is the same as in the answer of src.
stefaanv
If you must acquire multiple locks at the same time (and I highly recommend you don't), then you must ensure that the lock acquisition occurs in the same order in every thread (and this is REALLY hard to maintain as correct and hence the warnings against this approach; this approach may not *always* be safe either, it depends on the synchronization primitive being used, you'll have to read up on it).
ceretullis
Also, acquiring locks in the same order means you sometimes have to acquire locks that you don't really need to otherwise. E.g. say you had an array of locks[3], and you had one thread that needed lock 0 and 1 concurrently, and another that needed lock 2 only. The second thread would always have to acquire lock 0, lock1, and then lock 2 to prevent dead-lock. Which is yet another reason YOU SHOULD NOT DO THIS.
ceretullis
+1  A: 

You should not lock second mutex when you already locked one.

Since the question is tagged with C++, I suggest to implement locking inside get/add logic of the queue class (e.g. using boost locks) or write a wrapper if your queue is not a class.

This allows you to simplify the locking logic.

Regarding the sources you have added: queue size check and following put/get should be done in one transaction otherwise another thread can edit the queue in between

Gregory
A: 

hi!

1 problem may occur due to this rule "The main-process must continue running all the time, must not be blocked on a 'read'". How did you implement it? what is the difference between 'get' and 'read'?

Problem seems to be in your implementation, not in the logic. And as you stated, you should not be in any dead lock because you are not acquiring another lock whether in a lock.

Muktadir
slashmais
+1  A: 

Use the debugger. When your solution with mutexes hangs look at what the threads are doing and you will get a good idea about the cause of the problem.

What is your platform? In Unix/Linux you can use POSIX message queues (you can also use System V message queues, sockets, FIFOs, ...) so you don't need mutexes.

Learn about condition variables. By your description it looks like your Qmaster-thread is busy looping, burning your CPU.

One of your responses suggest you are doing something like:

Q2_mutex.lock()
Qmain_mutex.lock()
Qmain.put(Q2.get())
Qmain_mutex.unlock()
Q2_mutex.unlock()

but you probably want to do it like:

Q2_mutex.lock()
X = Q2.get()
Q2_mutex.unlock()

Qmain_mutex.lock()
Qmain.put(X)
Qmain_mutex.unlock()

and as Gregory suggested above, encapsulate the logic into the get/put.

EDIT: Now that you posted your code I wonder, is this a learning exercise? Because I see that you are coding your own FIFO queue class instead of using the C++ standard std::queue. I suppose you have tested your class really well and the problem is not there.

Also, I don't understand why you need three different queues. It seems that the Qmain queue would be enough, and then you will not need the Qmaster thread that is indeed busy waiting.

About the encapsulation, you can create a synch_fifo_q class that encapsulates the fifo_q class. Add a private mutex variable and then the public methods (put, get, clear, count,...) should be like put(X) { lock m_mutex; m_fifo_q.put(X); unlock m_mutex; }

question: what would happen if you have more than one reader from the queue? Is it guaranteed that after a "count() > 0" you can do a "get()" and get an element?

src
slashmais
@slashmais: Use the google, Luke! The interesting parts are POSIX (IEEE 1003) 1b and 1c. 1b are the real time extensions, including message queues and asynchronous I/O, and 1c are the threads extensions. By encapsulation I mean that the "put" method should probably be like put(X) { lock queue mutex; put X in queue; unlock queue mutex; }.
src
+1  A: 

I wrote a simple application below:

#include <queue>
#include <windows.h>
#include <process.h>
using namespace std;

queue<int> QMain, Q1, Q2;
CRITICAL_SECTION csMain, cs1, cs2;

unsigned  __stdcall TMaster(void*)
{
    while(1)
    {
        if( Q1.size() > 0)
        {
            ::EnterCriticalSection(&cs1);
            ::EnterCriticalSection(&csMain);
            int i1 = Q1.front();
            Q1.pop();
            //use i1;
            i1 = 2 * i1;
            //end use;
            QMain.push(i1);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs1);
        }
        if( Q2.size() > 0)
        {
            ::EnterCriticalSection(&cs2);
            ::EnterCriticalSection(&csMain);
            int i1 = Q2.front();
            Q2.pop();
            //use i1;
            i1 = 3 * i1;
            //end use;
            QMain.push(i1);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs2);
        }
    }
    return 0;
}

unsigned  __stdcall TMoniter(void*)
{
    while(1)
    {
        int irand = ::rand();
        if ( irand % 6 >= 3)
        {
            ::EnterCriticalSection(&cs2);
            Q2.push(irand % 6);
            ::LeaveCriticalSection(&cs2);
        }
    }
    return 0;
}

unsigned  __stdcall TMain(void)
{
    while(1)
    {
        if (QMain.size() > 0)
        {
            ::EnterCriticalSection(&cs1);
            ::EnterCriticalSection(&csMain);
            int i = QMain.front();
            QMain.pop();
            i = 4 * i;
            Q1.push(i);
            ::LeaveCriticalSection(&csMain);
            ::LeaveCriticalSection(&cs1);
        }
    }
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    ::InitializeCriticalSection(&cs1);
    ::InitializeCriticalSection(&cs2);
    ::InitializeCriticalSection(&csMain);
    unsigned threadID;
    ::_beginthreadex(NULL, 0, &TMaster, NULL, 0, &threadID);
    ::_beginthreadex(NULL, 0, &TMoniter, NULL, 0, &threadID);
    TMain();

    return 0;
}
Jinyuan
I've copied this as an example for when I hit this level of problem. Thanks. (thanks outis)
slashmais
+1  A: 

An example of how I would adapt the design and lock the queue access the posix way. Remark that I would wrap the mutex to use RAII or use boost-threading and that I would use stl::deque or stl::queue as queue, but staying as close as possible to your code:

main-process:
...
start thread Monitor
...
while (!quit)
{
    ...
    if (Qmain.count() > 0)
    {
        X = Qmain.get();
        process(X) 
            delete X;
    }
    ...
    //at some random time:
    QMain.put(Y);
    ...
}

Monitor:
{
    while (1)
    {
        //obtain & package data
        QMain.put(data)
    }
}

fifo_q:
template < class X* > class fifo_q
{
    struct item
    {
        X* data;
        item *next;
        item() { data=NULL; next=NULL; }
    }
    item *head, *tail;
    int count;
    pthread_mutex_t m;
public:
    fifo_q() { head=tail=NULL; count=0; }
    ~fifo_q() { clear(); /*deletes all items*/ }
    void put(X x) 
    { 
      pthread_mutex_lock(&m);
      item i=new item(); 
      (... adds to tail...); 
      count++; 
      pthread_mutex_unlock(&m);
    }
    X* get() 
    { 
      pthread_mutex_lock(&m);
      X *d = h.data; 
      (...deletes head ...); 
      count--; 
      pthread_mutex_unlock(&m);
      return d; 
    }
    clear() {...}
};

Remark too that the mutex still needs to be initialized as in the example here and that count() should also use the mutex

stefaanv
Implemented it your way but it's done now :) Now I'm going to take a break (it's a cool evening in SA)
slashmais