views:

307

answers:

7

I have a data feed continuously feeding data packet in. There are 5 threads(A, B, C, D, E) processing the data packages. Note the 5 threads have totally different speed and they generate 5 different features(each thread generate 1 feature) for every incoming data package.

The 5 threads are at different pace: when A has finished analyzing first 10 packages, B might only have finished package 1, package 2, and C might have not even finish a single package at all.

My task is to match the results from 5 threads, and start the final analysis when all the 5 features for the first 10 data package are available.

My question is: - How to combine the results from different threads making sure the analysis thread is only triggered when a certain amount of result are available? - I seems that I need a aggregator thread checking the availability of different buffers. I am thinking in terms of lock/condition. How could I implement such a condition involving different buffers?

Totally newbie in multithreading. Any suggestion is welcomed.

I am using GNU C++ with Boost library.

+2  A: 

Have yourself an "aggregator" thread: this thread would get its input from the worker threads (through non-blocking thread-safe queues I suggest) and once a "batch" is ready, push it to your "analyzer" thread.

Queues offer the advantage of not blocking any of the workers: the "aggregator" just has to poll the worker queues (through a condition section). You can control the rate of polling to your liking.

This solution goes around the problem of "synchronize all" situations.

jldupont
My question is: how I know the batch is ready using lock/condition
Lily
when an item from a queue is ready (the "condition" is signaled), dequeue, update batch state variables and evaluate if the batch is ready. In other words, there is a "condition" for each queue that regulates access to it. The "aggregator" thread just sits around polling the queues in turn. As far as the algorithm for your batch being ready, that would be the same regardless of the implementation you choose most probably. My solution only makes the whole process asynchronous so that your requirement about efficiency is met.
jldupont
A: 

Barriers are the canonical "synchronize all" operation.

However, it sounds like you want to have a "count result" variable in a critical section that is incremented when a certain amount is done. Then, you want to do a "block until variable is equal to x". That can be accomplished with a spin-lock against the count result variable.

Paul Nathan
right, it's a synchronize all operation. My problem is I have 5 counters for different threads already. What could be an efficient way to check the 5 counters to generate a result count and apply the poin lock to it?
Lily
A: 

Have a container that stores results and a function like this (psuedo code):

void storeResult(Result result) {
    Mutex m("ResultContainerMutex");

    container.push_back(result);
    if(container.size() > ANALYSIS_SIZE) {
        StartAnalysisThread(container.copy());
        container.clear();
    }
}

Since the mutex is only protecting the add to container operation which is fairly quick it shouldn't cause excessive serialization.

joshperry
+1  A: 

You may want to check the Producer-consumer problem

+1  A: 

Use semaphores, and extra boolean 'done' variables. Every time a thread is done, it FIRST write its answers, THEN its 'done' variable, then calls a 'check' function that checks all treads 'done' variables and if they're all true trigger the analysis thread.

depending on your performance trade-offs, you probably want just the slowest 'work' thread to ever call the 'check' function, so the fast ones won't keep locking its 'done' variable for reading. This, of course, depends on knowing which is the slowest.

I don't know your reset policy: do you want to wait for 10 fresh inputs every time or analyze the 10 most recent continuously?

Emilio M Bumachar
my policy is the latter, analyze the 10 most recent continuously. I know which one is the slowest, and it's far slower than the other 4.
Lily
+1  A: 

Some pseudocode:

worker thread: 
   -> do work, 
     -> publish result to queue
   -> 10 reached, signal my condvar 

aggregator thread: 
   -> wait on all condvars. 
   -> lock all result queues, swap in new empty ones.  
   -> do aggregation processing.

The reason for creating new queues is that your aggregation processing may result in significant amounts of locking, and invalidation if items are removed - if you put new queues on your worker threads, you need to worry less about locking (especially as the aggregator doesn't need to share it's results BACK with the workers).

Chris Kaminski
A: 

With your current design your are limited by the slowest computation, the other threads won't be used enough.

If you want to process a lot of packets, I would instead split the work like this :

Distribute data packets to N identical thread which compute the 5 result in sequence for the packets they receive.

Each thread puts its result packets in a thread safe fifo.

Your main thread reads the result and if needed reorders them using packet numbers

fa.