Have the output queue contain futures rather than the actual data. When you retrieve an item from the input queue, immediately post the corresponding future onto the output queue (taking care to ensure that this preserves the order --- see below). When the worker thread has processed the item it can then set the value on the future. The output thread can read each future from the queue, and block until that future is ready. If later ones become ready early this doesn't affect the output thread at all, provided the futures are in order.
There are two ways to ensure that the futures on the output queue are in the correct order. The first is to use a single mutex for reading from the input queue and writing to the output queue. Each thread locks the mutex, takes an item from the input queue, posts the future to the output queue and releases the mutex.
The second is to have a single master thread that reads from the input queue, posts the future on the output queue and then hand the item off to a worker thread to execute.
In C++ with a single mutex protecting the queues this would look like:
#include <thread>
#include <mutex>
#include <future>
struct work_data{};
struct result_data{};
std::mutex queue_mutex;
std::queue<work_data> input_queue;
std::queue<std::future<result_data> > output_queue;
result_data process(work_data const&); // do the actual work
void worker_thread()
{
for(;;) // substitute an appropriate termination condition
{
std::promise<result_data> p;
work_data data;
{
std::lock_guard<std::mutex> lk(queue_mutex);
if(input_queue.empty())
{
continue;
}
data=input_queue.front();
input_queue.pop();
std::promise<result_data> item_promise;
output_queue.push(item_promise.get_future());
p=std::move(item_promise);
}
p.set_value(process(data));
}
}
void write(result_data const&); // write the result to the output stream
void output_thread()
{
for(;;) // or whatever termination condition
{
std::future<result_data> f;
{
std::lock_guard<std::mutex> lk(queue_mutex);
if(output_queue.empty())
{
continue;
}
f=std::move(output_queue.front());
output_queue.pop();
}
write(f.get());
}
}