views:

142

answers:

6

Hey, I have a server application which accepts incomming queries and executes them. If there are too many queries they should be queued and if some of the other queries got executed the queued queries should be executed as well. Since I want to pass queries with different priorities I think using a priority_queue would be the best choice.

e.g. The amout of the axcepting queries (a) hit the limt and new queries will be stored in the queue. All queries have a priority of 1 (lowest) if some of the queries from (a) get executed the programm will pick the query with the highest priority out of the queue and execute it. Still no problem. Now someone is sending a query with a priority of 5 which gets added to the queue. Since this is the query with the highest priority the application will execute this query as soon as the running queries no longer hit the limit. There might be the worst case that 500 queries with a priority of 1 are queued but wont be executed since someone is always sending queries with a priority of 5 hence these 500 queries will be queued for a looooong time. In order to prevent that I want to increase the prioritiy of all queries which have a lower priority than the query with the higher priority, in this example which have a priority lower than 5. So if the query with a priority of 5 gets pulled out of the queue all other queries with a priority < 5 should be increased by 0.2. This way queries with a low priority wont be queued for ever even if there might be 100 queries with a higher priority.

I really hope someone can help me to solve the problem with the priorities:

Since my queries consist of an object I thought something like this might work:

class Query {
public:
    Query( std::string p_stQuery ) : stQuery( p_stQuery ) {};
    std::string getQuery() const {return stQuery;};
    void increasePriority( const float fIncrease ) {fPriority += fIncrease;};

    friend bool operator < ( const Query& PriorityFirst, const Query& PriorityNext ) {
        if( PriorityFirst.fPriority < PriorityNext.fPriority ) {
            if( PriorityFirst.fStartPriority < PriorityNext.fStartPriority ) {
                Query qTemp = PriorityFirst;
                qTemp.increasePriority( INCREASE_RATE );
            }

            return true;
        } else {
            return false;
        }
    };

private:
    static const float INCREASE_RATE = 0.2;
    float fPriority; // current priority
    float fStartPriority; // initialised priority
    std::string stQuery;
};
A: 

If you need to modify the priorities, then you can't use a priority_queue because there's no interface to access all the elements. You're better off with std::vector and std::make_heap.

Kristo
A: 

A std::priority_queue doesn't have any way to reorganize itself if your sort criteria changes. You'll nede to manage it yourself.

I would suggest just using a std::vector and one of two approaches to maintain it.

There are two cases: If you're reprioritizing rather more often than you remove elements (which sounds like is not the case), just keep it unsorted and use min_element to find the item to remove when nedeed.

Otherwise probably better is to use Kristo's approach and maintain your own heap. When you reprioritize call make_heap, to add use push_heap, and to get the top item use pop_heap.

Mark B
If you're increasing the priority of each element such that the ordering doesn't change (you're increasing them by the same amount or to a fixed maximum), you've not changed the heap invariant and shouldn't need to recreate the heap.
Andrew Aylett
@andrew you should provide that as an answer
caspin
@Andrew If the lower priority query is say 4.9 and the higher priority one is 5.0, and I increment priority by 0.2 then the order does in fact change (As I read the OP only the lower priority item gets it priority increased).
Mark B
A: 

The ACE framework probably provides something that could help you. See classes ACE_Dynamic_Message_Queue and ACE_Laxity_Message_Strategy. I haven't worked with this particular classes yet, so I could not give you an example. But the idea is as follows:

  • You have a message queue shared by two threads
  • In the producer thread, you fill the message queue
  • The message queue will insert new messages at the correct place according to the strategy.
  • In the receiver thread you just read the topmost item from the queue.
jopa
A: 

I would go with a std::deque and build the rest yourself(if you are just using C++ with no external libs that might help). The problem with the other suggestions of make_heap (which is what std::priority_queue uses) is that it isn't stable. Lack of stability in this case means that ordering isn't guarantied within a priority and starvation is possible.

stonemetal
Good call on stability.
Matthieu M.
+1  A: 

First of all some comments on your code:

  1. You cannot guarantee that operator < will only be called once for each object every removal (it can be called both at top and pop, or on push, or ...).
  2. You increase the priority of a local copy in the operator function, not the copy in the queue
  3. There is no need to use friend functions here to declare an operator<.

I wrote an example that overrides the priority_queue to the specific queue you want, I hope you can continue from here. The behaviour should be implemented in the queue, not in the Query class at all, this only should provide the necessary accessors to allow this behaviour. The Query class should have no knowledge about the Queue.

Basically it copies the size and empty of the normal queue, and creates 2 new methods to insert and get queries (push, pop and top are disabled). Insert just calls push, get calls both top, pop and updates all priorities using a for_each call on the local container. Finally a small program is provided showing functionality.

Also, it is based on the heap managing in pop and push. This will work correctly as far as I know because of the linear change on every element, the order does not change between pushes ;).

#include <algorithm>
#include <queue>
#include <iostream>

class Query
{
public:
    Query( std::string p_stQuery, double priority = 1.0) : stQuery( p_stQuery ) , fPriority(priority), fStartPriority(priority) {};
    std::string getQuery() const
    {
        return stQuery;
    };
    void conditionalIncrease( const Query& otherQuery )
    {
        if (fStartPriority < otherQuery.fStartPriority) fPriority += INCREASE_RATE;
    }

    bool operator < ( const Query& rhs )  const
    {
        return fPriority < rhs.fPriority;
    }

    double getPriority() const
    {
        return fPriority;
    }
private:
    static const double INCREASE_RATE = 0.2;
    double fPriority; // current priority
    double fStartPriority; // initialised priority
    std::string stQuery;
};

class QueryIncreaser
{
private:
    Query base;
public:
    QueryIncreaser(const Query& q) : base(q) {}
    void operator() (Query& q)
    {
        q.conditionalIncrease(base);
    }
};

class QueryQueue : private std::priority_queue<Query>
{
public:
    void insertQuery(const Query& q)
    {
        push(q);
    }
    Query getQuery()
    {
        Query q = top();
        pop();
        QueryIncreaser comp(q);
        for_each(c.begin(), c.end(), comp);
        return q;
    }
    bool empty() const
    {
        return std::priority_queue<Query>::empty();
    }
    size_t size() const
    {
        return std::priority_queue<Query>::size();
    }
};

int main ()
{
    Query a("hello"), b("world",2.);
    QueryQueue myQueue;
    myQueue.insertQuery(a);
    myQueue.insertQuery(b);
    while (!myQueue.empty())
    {
        std::cout << myQueue.getQuery().getPriority() << std::endl;
    }
    return 0;
}
KillianDS
You should use Composition instead of Inheritance. Also, I am not sure about only increasing queries with a lower priority: you normally pop the highest priority query so... that would simplify the code somewhat :)
Matthieu M.
@Matthieu if there were multiple "highest priority" items in the queue, you would need the "less than" check I think. You could still stop after you hit an item not less than though.
Mark B
Good call! And to say I did the very same remark on another answer... my brain's lacking consistency today.
Matthieu M.
@Matthieu, What I posted is not immiediately possible with composition (take a better look at the getQuery method). I do however agree that if you really need the performance you'll be probably better off with your own heap implementation. And indeed, the code can probably be made slightly faster by avoiding for_each :).
KillianDS
I don't see anything in `getQuery` that couldn't be achieve with composition... but I may be missing something, I'm slightly sick atm.
Matthieu M.
+3  A: 

How many discrete priority values do you want to implement? If their number is small (say, 256 levels), then instead of a single priority queue it makes more sense to have 256 simple queues (this is how priority process schedulers are implemented in some OSes). Initially your events sent with priority 1 are placed on queue #1. Then someone sends a prio=25 event, and it is placed on queue #25. The reader reads the event from the highest non-empty queue (in this case #25) and the events on all non-empty queues under 25 are moved up a slot (the entire queue #1 is moved to queue #2, etc). I am pretty sure this could all be done in O(k) (where k is the number of priority levels), either with std::move or with std::swap or with list::splice.

Moving queue #1 to the position earlier taken by queue #2 should be O(1) with move or swap, and if the remainder of prio=25 queue is not empty, then moving all elements up from queue #24 into queue #25 is O(1) if the queues are std::list's and list::splice() is used.

Cubbi
Very interesting idea. I would not use list here, more probably a `vector` of `deque`. Also consider the corner case: retrieving an item of priority 25 does not mean that this queue is empty, so you need to concatenate the queue 24 at the end of the queue 25. Then clear 24, and then swaps 24<->23, 23<->22, etc.. 1<-->0. The concatenation might be longish (depending on the size), but all other operations are simple pointers assignments so they'll be fast.
Matthieu M.
256 discrete priority values would be definitely the maximum. I really like the idea, still I'm a little bit concerned about the movements and the performance... O(n) would be OK.
Layne
Oops, I meant to write O(1), not O(n), sorry. list::splice is constant time.
Cubbi
Or, to be precise, O(k) where k is the number of priority levels.
Cubbi