views:

315

answers:

5

any body familiar with how I can implement a multiprocessing priority queue in python?

+1  A: 

There is a bug that prevents true FIFO.
Read here.

An alternate way to build a priority queue multiprocessing setup would be certainly be great!

Kevin Boyd
Yeah, but is there a good way to do priority queue? is it possible? (unless I have to specifically write a process to manage, which I think will be even harder and more error prone)
I haven't found a way yet, will keep you posted though.
Kevin Boyd
+3  A: 

Alas, it's nowhere as simple as changing queueing discipline of a good old Queue.Queue: the latter is in fact designed to be subclassed according to a template-method pattern, and overriding just the hook methods _put and/or _get can easily allow changing the queueing discipline (in 2.6 explicit LIFO and priority implementations are offered, but they were easy to make even in earlier versions of Python).

For multiprocessing, in the general case (multiple readers, multiple writers), I see no solution for how to implement priority queues except to give up on the distributed nature of the queue; designate one special auxiliary process that does nothing but handle queues, send (essentially) RPCs to it to create a queue with a specified discipline, do puts and gets to it, get info about it, &c. So one would get the usual problems about ensuring every process knows about the aux proc's location (host and port, say), etc (easier if the process is always spawned at startup by the main proc). A pretty large problem, especially if one wants to do it with good performance, safeguards against aux proc's crashes (requiring replication of data to slave processes, distributed "master election" among slaves if master crashes, &c), and so forth. Doing it from scratch sounds like a PhD's worth of work. One might start from Johnson's work, or piggyback on some very general approach like ActiveMQ.

Some special cases (e.g. single reader, single writer) might be easier, and turn out to be faster for their limited area of application; but then a very specifically restricted spec should be drawn up for that limited area -- and the results would not constitute a (general purpose) "multiprocessing queue", but one applicable only to the given constrained set of requirements.

Alex Martelli
A: 

While this isn't an answer, maybe it can help you develop an multiprocessing queue.

Here's a simple priority queue class I wrote using Python's Array:

class PriorityQueue():
    """A basic priority queue that dequeues items with the smallest priority number."""
    def __init__(self):
        """Initializes the queue with no items in it."""
        self.array = []
        self.count = 0

    def enqueue(self, item, priority):
        """Adds an item to the queue."""
        self.array.append([item, priority])
        self.count += 1

    def dequeue(self):
        """Removes the highest priority item (smallest priority number) from the queue."""
        max = -1
        dq = 0
        if(self.count > 0):
            self.count -= 1

            for i in range(len(self.array)):
                if self.array[i][1] != None and self.array[i][1] > max:
                    max = self.array[i][1]

            if max == -1:
                return self.array.pop(0)
            else:
                for i in range(len(self.array)):
                    if self.array[i][1] != None and self.array[i][1] <= max:
                        max = self.array[i][1]
                        dq = i
                return self.array.pop(dq)

    def requeue(self, item, newPrio):
        """Changes specified item's priority."""
        for i in range(len(self.array)):
            if self.array[i][0] == item:
                self.array[i][1] = newPrio
                break

    def returnArray(self):
        """Returns array representation of the queue."""
        return self.array

    def __len__(self):
        """Returnes the length of the queue."""
        return self.count
Isaac Hodes
A: 

Depending on your requirements you could use the operating system and the file system in a number of ways. How large will the queue grow and how fast does it have to be? If the queue may be big but you are willing to open a couple files for every queue access you could use a BTree implementation to store the queue and file locking to enforce exclusive access. Slowish but robust.

If the queue will remain relatively small and you need it to be fast you might be able to use shared memory on some operating systems...

If the queue will be small (1000s of entries) and you don't need it to be really fast you could use something as simple as a directory with files containing the data with file locking. This would be my preference if small and slow is okay.

If the queue can be large and you want it to be fast on average, then you probably should use a dedicated server process like Alex suggests. This is a pain in the neck however.

What are your performance and size requirements?

Aaron Watters
A: 

I had the same use case. But with a finite number of priorities.

What I am ending up doing is creating one Queue per priority, and my Process workers will try to get the items from those queues, starting with the most important queue to the less important one (moving from one queue to the other is done when the queue is empty)