views:

259

answers:

3

I ran into the following problem and cannot decide how to proceed:

I have a class, Reader, getting a chunk of data every 1/T seconds (actually the data is from video frames, 30 frames per second). The chunks are to be passed to several objects, Detectors that process the chunks and output a decision. However, the number of chunks that each detector needs to read before making a decision varies, e.g. some may need only one chunk, some 51.

I am thinking of having a data buffer where Reader places the read data chunks, implementing publish/subscriber to register each Detector and sending it a signal when there are enough number of frames in the data buffer for it to process. Is this a good approach? Also, what's the best way to manage the buffer and have Detectors read data from it without making their own copies? Shared pointers?

Thanks a lot!

C

+4  A: 

I'd look into a ring buffer/circular queue. This will allow you to do what you want with only a one-time memory allocation (provided you make the initial buffer size large enough to hold the maximum necessary number of frames).

As for managing access to the buffer, signaling when data is ready and sharing pointers with the reader(s) will work, but if you're using multiple threads some type of synchronization will be necessary, c.f. the produced-consumer problem.

bcat
could you elaborate "signaling when data is ready and sharing pointers with the reader(s) will work"? do you mean for each chunk of data, create a shared_ptr for it?
Lily
+2  A: 

I've recently implemented something similiar to what you're describing.

I highly recommend the boost::interprocess library (boost.org for more information).

What you're looking for is boost::interprocess / managed_shared_memory. It's gonna look a bit weird at first but once you get the hang of it - you'll love it.

What you want to do is : Create a managed shared memory segment. Allocate an object that is going to handle interprocess communication using a void_allocator (look up allocators). Implement synchronisation mechanisms (boost::interprocess:semaphore & boost::interprocess_mutex for instance). Implement communication from separate processes via the managed shared memory.

Maciek
I was thinking about a event handling scheme, so does that mean that I don't have to send signals to the Detector threads when there's enough data for them to process? I'm new to multithreading and have not yet grasped the difference between signal passing and multithreading. Thx!
recipriversexclusion
ummmmm, basically thanks to synchronisation mechanisms implemented via boost, the whole thing is asynchronous. For instance a worker thread encountering a semaphore will wait for the semaphore.post() event without stressing the CPU (like a while(condition) {} would do)I honestly recommend it, I learned a ton.
Maciek
Maciek, would you mind also direct me to the materials you looked at. I have a similar problem here. Sample codes or references?
Lily
+2  A: 

I think (also based on your comment to Maciek) you have to start by understanding the difference between threads and processes and how they can communicate.

Regarding the design problem: Try to start with a simple design. for instance, using only threads and passing each of the subscribers a shared_ptr to the job using it's own synchronized queue*. Since the access to the data is read-only and, AFAICR, boost::shared_ptr is multi-threading safe for such a use, there are no synchronization problems and the data is cleaned automatically. Don't worry about memory realocations (yet), Just make sure you are using a finite amount of memory (o(1)) (as you said, about 51 shared_ptrs at most) per subscriber/thread.

When you'll have this working skeleton, you will be able to start optimizing based on the problems you encounter. If realocations are the problem, you can move to a ring buffer (as suggested by bcat). or you can replace your allocator (/new operator) with a pool allocator. if you have many subscribers, it might be effective to merge the queues into a single one used by all the threads. Doing that requires more information (what if one thread is very slow due to a very long computation? do you have some way to signal it to stop processing? or should the queue grow? if this is the case, a cyclic buffer may not work so well...) and may have its complications, but remember we are only trying to save the room occupied by the shared_ptrs (and not the jobs).

Bottom line, try to avoid premature optimizations. instead, write it with reasonable optimization and extendability in design and go on from there based on what you learn.

Good luck

* synchronized queue - a queue between threads. push(j) adds the job and pop() waits until the queue is not empty and returns the top job (unlike stl::queue. This is important when the queue is read by more than one thread). I usually implement it by wrapping an stl::queue and protecting it using boost::mutex.

Oren S
Thanks for your detailed suggestions Oren S.
recipriversexclusion
I am not sure about "its own synchronized queue". Do you mean each subscriber has its own queue?
Lily
Yeap. In a simple system (subscribers only get jobs) the subscriber registers only its queue or, preferably, its queue's push() method, because it only exposes a push(job) functionality.
Oren S