I have the following code in a function:
num_procs.times do
pid = fork
unless pid
DRb.start_service
ts = Rinda::TupleSpaceProxy.new(DRbObject.new_with_uri('druby://localhost:53421'))
loop do
item = ts.take([:enum, nil, nil])
# our termination tuple
break if item == [:enum, -1, nil]
result =
begin
block.call(item[2])
rescue Object => e
e
end
# return result
ts.write([:result, item[1], result])
end
DRb.stop_service
exit!
end
pids << pid
end
pts = Rinda::TupleSpace.new
# write termination tuples
items.size.times do
pts.write([:enum, -1, nil])
end
items.each_with_index { |item, index|
pts.write([:enum, index, item])
}
DRb.start_service('druby://localhost:53421', pts)
# Grab results
items.size.times do
result_tuples << pts.take([:result, nil, nil])
end
pp "Waiting for pids: #{pids.inspect}" if FORKIFY_DEBUG
pids.each { |p| Process.waitpid(p) }
DRb.stop_service
# gather results and sort them
result_tuples.map { |t|
results[t[1]] = t[2]
}
return results
This code forks a various number of times, the children then attempt to get tuples from the parent using a Rinda::TupleSpaceProxy over DRb. The parent pushes items for each process to call a block on. The children then return their results with different tuples to the parent, which aggregates them.
This code is in a library, so I don't want the user to have to start something like beanstalkd or equivalent just to queue jobs for the process pool. My question is this:
Is there a better way I can be doing cross-process queue communication?