tags:

views:

85

answers:

4

I have a large file (hundreds of megs) that consists of filenames, one per line.

I need to loop through the list of filenames, and fork off a process for each filename. I want a maximum of 8 forked processes at a time and I don't want to read the whole filename list into RAM at once.

I'm not even sure where to begin, can anyone help me out?

+4  A: 

It sounds like the Process module will be useful for this task. Here's something I quickly threw together as a starting point:

include Process

i = 0
for line in open('files.txt') do
    i += 1
    fork { `sleep #{rand} && echo "#{i} - #{line.chomp}" >> numbers.txt` }

    if i >= 8
        wait # join any single child process
        i -= 1
    end
end

waitall # join all remaining child processes

Output:

hello
goodbye

test1
test2
a
b
c
d
e
f
g
$ ruby b.rb
$ cat numbers.txt 
1 - hello
3 - 
2 - goodbye
5 - test2
6 - a
4 - test1
7 - b
8 - c
8 - d
8 - e
8 - f
8 - g

The way this works is that:

  • for line in open(XXX) will lazily iterate over the lines of the file you specify.
  • fork will spawn a child process executing the given block, and in this case, we use backticks to indicate something to be executed by the shell. Note that rand returns a value 0-1 here so we are sleeping less than a second, and I call line.chomp to remove the trailing newline that we get from line.
  • If we've accumulated 8 or more processes, call wait to stop everything until one of them returns.
  • Finally, outside the loop, call waitall to join all remaining processes before exiting the script.
Mark Rushakoff
Thanks! That looks very promising. I'd been playing around with forks in ruby, but hadn't done "include Process." What does that buy you?
Sam
Nevermind. I figured it out :) Thanks again!
Sam
A: 
File.foreach("large_file").each_slice(8) do |eight_lines|
  # eight_lines is an array containing 8 lines.
  # at this point you can iterate over these filenames
  # and spawn off your processes/threads
end
glenn jackman
This won't work as it would spawn N/8 processes (N being the number of lines in the file). You could do `each_slice(N/8)` instead, but that would require loading whole file into an array, which OP wants to avoid.
Mladen Jablanović
I was assuming that within the loop, the OP would spawn off 8 processes and wait for them before continuing. I'm chaining enumerable methods, so it's not going to read the whole file at once.
glenn jackman
Ah, sorry. But that way is not very efficient either, as the program has to wait all eight processes to finish before spawning next eight, and you will have 100% process utilization only part of the time. In the extreme case, you can have seven processes finish quickly and you'll have to wait for the one long-running left.
Mladen Jablanović
that is true...
glenn jackman
A: 

Here's Mark's solution wrapped up as a ProcessPool class, might be helpful to have it around (and please correct me if I made some mistake):

class ProcessPool
  def initialize pool_size
    @pool_size = pool_size
    @free_slots = @pool_size
  end

  def fork &p
    if @free_slots == 0
      Process.wait
      @free_slots += 1
    end
    @free_slots -= 1
    puts "Free slots: #{@free_slots}"
    Process.fork &p
  end

  def waitall
    Process.waitall
  end
end

pool = ProcessPool.new 8
for line in open('files.txt') do
  pool.fork { Kernel.sleep rand(10); puts line.chomp }
end
pool.waitall
puts 'finished'
Mladen Jablanović
A: 

The standard library documentation for Queue has

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

I do find it a little verbose though.

Wikipedia describes this as a thread pool pattern

Andrew Grimm