tags:

views:

71

answers:

1

I'm writing a UDP server that currently receives data from UDP wraps it up in an object and places them into a concurrent queue. The concurrent queue is the implementation provided here: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

A pool of worker threads pull data out of the queue for processing.

The queue is defined globally as:

static concurrent_queue<boost::shared_ptr<Msg> > g_work_queue_;

Now the problem I'm having is that if I simply write a function to produce data and insert it into the queue and create some consumer threads to pull them out it works fine. But the moment I add my UDP based producer the worker threads stop being notified of the arrival of data on the queue.

I've tracked the issue down to the end of the push function in concurrent_queue. Specifically the line: the_condition_variable.notify_one(); Does not return when using my network code.

So the problem is related to the way I've written the networking code.

Here is what it looks like.

enum
{
    MAX_LENGTH = 1500
};


class Msg
{
  public:
    Msg()
    {
       static int i = 0;
       i_ = i++;
       printf("Construct ObbsMsg: %d\n", i_);
    }

    ~Msg()
    {
       printf("Destruct ObbsMsg: %d\n", i_);
    }

    const char* toString() { return data_; }

  private:
    friend class server;

    udp::endpoint sender_endpoint_;
    char data_[MAX_LENGTH];
    int i_;
};

class server
{
public:
  server::server(boost::asio::io_service& io_service)
    : io_service_(io_service),
      socket_(io_service, udp::endpoint(udp::v4(), PORT))
  {
    waitForNextMessage();
  }  

  void server::waitForNextMessage()
  {
    printf("Waiting for next msg\n");

    next_msg_.reset(new Msg());

    socket_.async_receive_from(
        boost::asio::buffer(next_msg_->data_, MAX_LENGTH), sender_endpoint_,
        boost::bind(&server::handleReceiveFrom, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
  }

  void server::handleReceiveFrom(const boost::system::error_code& error, size_t bytes_recvd)
  {
    if (!error && bytes_recvd > 0) {
        printf("got data: %s. Adding to work queue\n", next_msg_->toString());
        g_work_queue.push(next_msg_); // Add received msg to work queue
        waitForNextMessage();
    } else {
        waitForNextMessage();
    } 
  }

private:
  boost::asio::io_service& io_service_;
  udp::socket socket_;

  udp::endpoint sender_endpoint_;
  boost::shared_ptr<Msg> next_msg_;
}

int main(int argc, char* argv[])
{
    try{
      boost::asio::io_service io_service;
      server s(io_service);
      io_service.run();
    catch(std::exception& e){
      std::err << "Exception: " << e.what() << std::endl;
    }
    return 0;
}

Now I've found that if handle_receive_from is able to return then notify_one() in concurrent_queue returns. So I think it's because I have a recursive loop. So what's the correct way to start listening for new data? and is the async udp server example flawed as I based it off what they were already doing.

EDIT: Ok the issue just got even weirder.

What I haven't mentioned here is that I have a class called processor. Processor looks like this:

class processor
{
public:
   processor::processor(int thread_pool_size) :
      thread_pool_size_(thread_pool_size) { }

  void start()
  {
    boost::thread_group threads;
    for (std::size_t i = 0; i < thread_pool_size_; ++i){
        threads.create_thread(boost::bind(&ObbsServer::worker, this));
    }
  }

  void worker()
  {
    while (true){
        boost::shared_ptr<ObbsMsg> msg;
        g_work_queue.wait_and_pop(msg);
        printf("Got msg: %s\n", msg->toString());
    }
  }

private:
  int thread_pool_size_;
};

Now it seems that if I extract the worker function out on it's own and start the threads from main. it works! Can someone explain why a thread functions as I would expect outside of a class, but inside it's got side effects?

EDIT2: Now it's getting even weirder still

I pulled out two functions (exactly the same).

One is called consumer, the other worker.

i.e.

void worker()
{
    while (true){
        boost::shared_ptr<ObbsMsg> msg;
        printf("waiting for msg\n");
        g_work_queue.wait_and_pop(msg);
        printf("Got msg: %s\n", msg->toString());
    }
}

void consumer()
{
    while (true){
        boost::shared_ptr<ObbsMsg> msg;
        printf("waiting for msg\n");
        g_work_queue.wait_and_pop(msg);
        printf("Got msg: %s\n", msg->toString());
    }
}

Now, consumer lives at the top of the server.cpp file. I.e. where our server code lives as well.

On the other hand, worker lives in the processor.cpp file.

Now I'm not using processor at all at the moment. The main function now looks like this:

void consumer();
void worker();

int main(int argc, char* argv[])
{
    try {
        boost::asio::io_service io_service;
        server net(io_service);
        //processor s(7);

        boost::thread_group threads;
        for (std::size_t i = 0; i < 7; ++i){
            threads.create_thread(worker); // this doesn't work
            // threads.create_thread(consumer); // THIS WORKS!?!?!?
        }

//        s.start();

        printf("Server Started...\n");
        boost::asio::io_service::work work(io_service);
        io_service.run();

        printf("exiting...\n");
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

Why is it that consumer is able to receive the queued items, but worker is not. They are identical implementations with different names.

This isn't making any sense. Any ideas?

Here is the sample output when receiving the txt "Hello World":

Output 1: not working. When calling worker function or using the processor class.

Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1

Output 2: works when calling the consumer function which is identical to the worker function.

Construct ObbsMsg: 0
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
waiting for msg
Server Started...
waiting for msg
got data: hello world. Adding to work queue
Construct ObbsMsg: 1
Got msg: hello world <----- this is what I've been wanting to see!
Destruct ObbsMsg: 0
waiting for msg
A: 

To answer my own question.

It seems the problem is to do with the declaration of g_work_queue;

Declared in a header file as: static concurrent_queue< boost::shared_ptr > g_work_queue;

It seems that declaring it static is not what I want to be doing. Apparently that creates a separate queue object for each compiled .o file and obviously separate locks etc.

This explains why when the queue was being manipulated inside the same source file with a consumer and producer in the same file it worked. But when in different files it did not because the threads were actually waiting on different objects.

So I've redeclared the work queue like so.

-- workqueue.h --
extern concurrent_queue< boost::shared_ptr<Msg> > g_work_queue;

-- workqueue.cpp --
#include "workqueue.h"
concurrent_queue< boost::shared_ptr<Msg> > g_work_queue;

Doing this fixes the problem.

Matt H
Recommend you accept your solution then :)
chrispy