A: 

Actually, you shouldn't need to accumulate the chunks. Most operating system and languages provide a random-access file abstraction that would allow each thread to independently write its output data to the correct position in the file without affecting the output data from any of the other threads.

Or are you writing to truly serial output file like a socket?

Steve Emmerson
Truly serial -- a stream cipher.
Crashworks
Your solution only works if the length of the output records is known before the processing completes.
Nick Johnson
A: 

I wouldn't use a reorderable buffer at all, personally. I'd create one 'job' object per job, and, depending on your environment, either use message passing or mutexes to receive completed data from each job in order. If the next job isn't done, your 'writer' process waits until it is.

Nick Johnson
I'm afraid I don't follow what you mean. Do you mean that I should have (n) many mutexes, one for each job, and that the writer should wait on each of them in ascending order? The trouble with this is that I only have memory to hold about twenty jobs at a time, and if I run into the case where the current window happens to complete in reverse order, that will leave several of the threads idle until the "head" one completes.
Crashworks
That's what I was suggesting, yes. I don't think any other solution will do any better if tasks complete in reverse order, except Steve's suggestion, if your records are known-length, or caching completed results to disk.
Nick Johnson
A: 

I would use a ringbuffer that has the same lenght as the number of threads you are using. The ringbuffer would also have the same number of mutexes.

The rinbuffer must also know the id of the last chunk it has written to the file. It is equivalent to the 0 index of your ringbuffer.

On add to the ringbuffer, you check if you can write, ie index 0 is set, you can then write more than one chunk at a time to the file.

If index 0 is not set, simply lock the current thread to wait. -- You could also have a ringbuffer 2-3 times in lenght than your number of threads and lock only when appropriate, ie : when enough jobs to full the buffer have been launched.

Don't forget to update the last chunk written tough ;)

You could also use double buffering when writting to the file.

Alex Rouillard
A: 

Have the output queue contain futures rather than the actual data. When you retrieve an item from the input queue, immediately post the corresponding future onto the output queue (taking care to ensure that this preserves the order --- see below). When the worker thread has processed the item it can then set the value on the future. The output thread can read each future from the queue, and block until that future is ready. If later ones become ready early this doesn't affect the output thread at all, provided the futures are in order.

There are two ways to ensure that the futures on the output queue are in the correct order. The first is to use a single mutex for reading from the input queue and writing to the output queue. Each thread locks the mutex, takes an item from the input queue, posts the future to the output queue and releases the mutex.

The second is to have a single master thread that reads from the input queue, posts the future on the output queue and then hand the item off to a worker thread to execute.

In C++ with a single mutex protecting the queues this would look like:

#include <thread>
#include <mutex>
#include <future>

struct work_data{};
struct result_data{};

std::mutex queue_mutex;
std::queue<work_data> input_queue;
std::queue<std::future<result_data> > output_queue;

result_data process(work_data const&); // do the actual work

void worker_thread()
{
    for(;;) // substitute an appropriate termination condition
    {
        std::promise<result_data> p;
        work_data data;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(input_queue.empty())
            {
                continue;
            }
            data=input_queue.front();
            input_queue.pop();
            std::promise<result_data> item_promise;
            output_queue.push(item_promise.get_future());
            p=std::move(item_promise);
        }
        p.set_value(process(data));
    }
}

void write(result_data const&); // write the result to the output stream

void output_thread()
{
    for(;;) // or whatever termination condition
    {
        std::future<result_data> f;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(output_queue.empty())
            {
                continue;
            }
            f=std::move(output_queue.front());
            output_queue.pop();
        }
        write(f.get());
    }
}
Anthony Williams
+1  A: 

The Enterprise Integration Patterns book calls this a Resequencer (p282/web).

Pete Kirkham