views:

85

answers:

1

Imagine an inverted binary tree with nodes A, B, C, D, E, F on level 0. nodes G,H,I on level 1, node J on level 2, and node K on level 3.

Level 1: G = func(A,B), H = func(C,D), I = func(E,F)

Level 2: J = func(G,H)

Level 3: K = func(J,I).

Each pair of nodes on Level 0 must be processed in order, Each pair of nodes on Level 1 can be processed in any order but the result must on the next level must be processed as shown, and so forth until we end up with the final result, K.

The actual problem is a computational geometry problem in which a sequence of solids are fused together. A is adjacent to B which is adjacent to C, and so on. The resulting fuse of A and B (G) is adjacent to the fuse of C and D (H). The resulting fuse of J and I (K) is the final result. Thus you can't fuse G and I since they are not adjacent. If the number of nodes on a level is not a power of 2, you end up with a dangling entity that must be processed one level further.

Since the fuse process is computationally expensive and memory intensive but very parallel, I would like to use the Python multiprocessing package and some form of queue. After calculating G = func(A,B), I would like to push the result G onto the queue for the subsequent J = func(G,H) computation. When the queue is empty, the last result is the final result. Keep in mind that the mp.queue will not necessarily produce results FIFO, since I = func(E,F) may finish before H = func(C,D)

I have come up with a few (bad) solutions but I'm sure there is an elegant solution just beyond my grasp. Suggestions?

A: 

I couldn't come up with a smart design for a queue, but you can easily replace the queue with one more process, which in my example I called WorkerManager. This process gathers results from all Worker processes and starts new workers only if there are two adjacent data packs waiting to be processed. This way, you'll never try to join non-adjacent results, so you can ignore "levels" and fire the computation of next pair as soon as it's ready.

from multiprocessing import Process, Queue

class Result(object):
    '''Result from start to end.'''
    def __init__(self, start, end, data):
        self.start = start
        self.end = end
        self.data = data


class Worker(Process):
    '''Joins two results into one result.'''
    def __init__(self, result_queue, pair):
        self.result_queue = result_queue
        self.pair = pair
        super(Worker, self).__init__()

    def run(self):
        left, right = self.pair
        result = Result(left.start, right.end,
                        '(%s, %s)' % (left.data, right.data))
        self.result_queue.put(result)


class WorkerManager(Process):
    '''
    Takes results from result_queue, pairs them
    and assigns workers to process them.
    Returns final result into final_queue.
    '''
    def __init__(self, result_queue, final_queue, start, end):
        self._result_queue = result_queue
        self._final_queue = final_queue
        self._start = start
        self._end = end
        self._results = []
        super(WorkerManager, self).__init__()

    def run(self):
        while True:
            result = self._result_queue.get()
            self._add_result(result)
            if self._has_final_result():
                self._final_queue.put(self._get_final_result())
                return
            pair = self._find_adjacent_pair()
            if pair:
                self._start_worker(pair)

    def _add_result(self, result):
        self._results.append(result)
        self._results.sort(key=lambda result: result.start)

    def _has_final_result(self):
        return (len(self._results) == 1
                and self._results[0].start == self._start
                and self._results[0].end == self._end)

    def _get_final_result(self):
        return self._results[0]

    def _find_adjacent_pair(self):
        for i in xrange(len(self._results) - 1):
            left, right = self._results[i], self._results[i + 1]
            if left.end == right.start:
                self._results = self._results[:i] + self._results[i + 2:]
                return left, right

    def _start_worker(self, pair):
        worker = Worker(self._result_queue, pair)
        worker.start()

if __name__ == '__main__':
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
    result_queue = Queue()
    final_queue = Queue()
    start = 0
    end = len(DATA)
    man = WorkerManager(result_queue, final_queue, start, end)
    man.start()
    for res in DATA:
        result_queue.put(res)
    final = final_queue.get()
    print final.start
    # 0
    print final.end
    # 6
    print final.data
    # For example:
    # (((0, 1), (2, 3)), (4, 5))

For my example, I used a simple Worker that returns given data in parentheses, separated by a comma, but you could put any computation in there. In my case, final result was (((0, 1), (2, 3)), (4, 5)) which means that the algorithm computed (0, 1) and (2, 3) before computing ((0, 1), (2, 3)) and then joined the result with (4, 5). I hope this is what you were looking for.

DzinX
I came up with a solution that looks like:def fuser(shapes): shape1_id, shape1 = shapes[0] shape2_id, shape2 = shapes[1] fused = OCC.BRepAlgoAPI.BRepAlgoAPI_Fuse(shape1, shape2).Shape() return ((shape1_id, shape2_id), fused)results = [(i,a) for i, a in enumerate(slices)]while len(results) > 1: P = processing.Pool(7 results = P.map(fuser, [(a,b) for a,b in zip(results[::2],results[1::2])]) results.sort(key=lambda result: result[0])