views:

75

answers:

4

I'm considering a multi-threaded architecture for a processing pipeline. My main processing module has an input queue, from which it receives data packets. It then performs transformations on these packets (decryption, etc.) and places them into an output queue.

The threading comes in where many input packets can have their contents transformed independently from one another.

However, the punchline is that the output queue must have the same ordering as the input queue (i.e., the first pulled off the input queue must be the first pushed onto the output queue, regardless of whether its transformations finished first.)

Naturally, there will be some kind of synchronisation at the output queue, so my question is: what would be the best way of ensuring that this ordering is maintained?

+1  A: 

That's going to be implementation-specific. One general solution is to number the input items and preserve the numbering so you can later sort the output items. This could be done once the output queue is filled, or it could be done as part of filling it. In other words, you could insert them into their proper position and only allow the queue to be read when the next available item is sequential.

edit

I'm going to sketch out a basic scheme, trying to keep it simple by using the appropriate primitives:

  1. Instead of queueing a Packet into the input queue, we create a future value around it and enqueue that into both the input and output queues. In C#, you could write it like this:

    var future = new Lazy<Packet>(delegate() { return Process(packet); }, LazyThreadSafetyMode.ExecutionAndPublication);
    
  2. A thread from the pool of workers dequeues a future from the input queue and executes future.Value, which causes the delegate to run JIT and returns once the delegate is done processing the packet.

  3. One or more consumers dequeues a future from the output queue. Whenever they need the value of the packet, they call future.Value, which returns immediately if a worker thread has already called the delegate.

Simple, but works.

Steven Sudit
great, +1 for the Lazy. Still, you implicitly mean some arbiter that reads the real input queue, which is an input to the encapsulated processing engine; this arbiter puts the 'futured' value into two internal queues: one for workers, and one for output. Workers process and signal the future, consumer gets a value and waits until the processing is finished (here it may occur that consumer calls Value before worker, which is never wanted). So this effectively gives us the same scheme that Anthony and me have proposed earlier.
ULysses
@ULysses: You're absolutely correct that this is not *fundamentally* different, and I'm sure I didn't claim it was. The difference is that I've recast the algorithm to avoid unnecessary complexity. And it's a genuine simplification: I really only have two queues here, not three, and there isn't a single ad hoc synchronization primitive to be found. The code is so much simpler that it's hard to see how a defect could sneak in during development, which is exactly my goal.
Steven Sudit
+1  A: 

With the following assumptions

  • there should be one input queue, one output queue and one working queue
  • there should be only one input queue listener
  • output message should contain a wait handle and a pointer to worker/output data
  • there may be an arbitrary number of worker threads

I would consider the following flow:

Input queue listener does these steps:

  1. extracts input message;
  2. creates output message:
    1. initializes worker data struct
    2. resets the wait handle
  3. enqueues the pointer to the output message into the working queue
  4. enqueues the pointer to the output message into the output queue

Worker thread does the following:

  1. waits on a working queue to extract a pointer to an output message from it
  2. processes the message based on the given data and sets the event when done

consumer does the following:

  1. waits on n output queue to extract a pointer to an output message from it
  2. waits on a handle until the output data is ready
  3. does something with the data
ULysses
This has one phase too many and doesn't do anything to ensure that the sequence is maintained.
Steven Sudit
Actually, I do like this: because there is only one input queue listener, and it is the only actor responsible for placing messages on the output queue, it can ensure that sequence itself. Additionally, because the input queue listener knows when the output queue becomes non-empty, it can schedule the next processing phase as well. Lots of details fall out of this.
Kaz Dragon
The wait handles should be integral to the queues, not ad hoc. Enqueuing sets the handle. Dequeuing that leaves the pipe empty clears the handle, causing consumers to block. With the right abstractions, the problem looks simple, not like this.
Steven Sudit
@Steven Sudit: this scheme is *designed* to ensure that the sequence is maintained. What you tell about queues in your last comment is surely true, and this is the essence of blocking queues. If you read carefully, you will notice that both listeners are `waiting on a queue` which tells that these are blocking queues. But blocking queues themselves aren't enough, so a wait handle is there to have *another* piece of information about each message that tells you if the message is ready for consumption. Any message may take arbitrary time for processing and this will not stop other worker threads
ULysses
My complaint is not with the use of wait handles, but with their use being ad hoc. The example I just wrote, using futures, also has a lock in there somewhere, but it's part of the `Lazy<T>` class.
Steven Sudit
I really struggled to choose an "accepted" answer here... all of the good answers have the same gem: "have the single thread that reads from the input queue put a placeholder on the output queue at the same time." So simple, I'm shocked I overlooked it. I was sorting via identifiers and all sorts. Anyway, this was the first one I read that gave me the "ah-hah!" moment. Thanks.
Kaz Dragon
@Kaz: In that case, I humbly suggest that you mark Anthony's answer as accepted, since he was the first to mention parallel insertion. All I did was flesh it out a bit.
Steven Sudit
A: 

If you are using a windowed-approach (known number of elements), use an array for the output queue. For example if it is media streaming and you discard packages which haven't been processed quickly enough.

Otherwise, use a priority queue (special kind of heap, often implemented based on a fixed size array) for the output items.

You need to add a sequence number or any datum on which you can sort the items to each data packet. A priority queue is a tree like structure which ensures the sequence of items on insert/pop.

Careful. A priority queue does fulfill the requirement for sorting the output, but nothing prevents item 2 from entering the priority queue, going straight to the front and being served out; all before item 1 is even done.
Steven Sudit
@Steven Sudit Sure it doesn't. Priority Queue by itself also does nothing in terms of synchronization. If you ask for a proper solution to said problem, you are probably asking for too much here.
I don't think this solves the problem I asked. Using a priority queue to maintain the correct sequence of packets when processing them in parallel implies, to me, that each packet has to know its own priority. For that, they may as well just carry around a number.
Kaz Dragon
@Kaz Dragon If you are asking whether to add a datum to each packet to facilitate ordering vs. some data structure which has the order of the packets and then pass references to the threads and use synchronization devices to indicate whether the processing is completed, I'd use a sequence indicator any time. Much less coupling, better separation.
using a priority queue in this problem makes us fall back to a PQ such that each subsequent item added has an always lower priority, assigned by an arbiter of an input queue. But this is just an ordinary queue, and to avoid what ToxicAvenger has noted, the new item has to be put to the output queue *before* any processing. Said that, we have to agree that there should be another pipeline that will transfer the items to workers for processing - different from the output queue that ensures the sequence of items. Well, I can now just point you back to my answer that has all the described elements
ULysses
I agree that we would need to add a placeholder to the output queue, but doesn't that obviate the need for it to be a *priority* queue?
Steven Sudit
+2  A: 

Have a single thread read the input queue, post a placeholder on the output queue, and then hand the item over to a worker thread to process. When the data is ready the worker thread updates the placeholder. When the thread that needs the value from the output queue reads the placeholder it can then block until the associated data is ready.

Because only a single thread reads the input queue, and this thread immediately puts the placeholder on the output queue, the order in the output queue is the same as that in the input. The worker threads can be numerous, and can do the transformations in any order.

On platforms that support futures, they are ideal as the placeholder. On other systems you can use an event, monitor or condition variable.

Anthony Williams
+1 for mentioning futures. They're one of the more elegant solutions to this problem.
Steven Sudit
This is a very nice and easy to read description. I wish I could write such somewhere in my post above, since it describes *exactly* how the proposed workflow functions - in a generic way.
ULysses
I've modified my answer to show an example of how it can work using futures.
Steven Sudit