views:

76

answers:

3
+3  Q: 

perl process queue

I have a Perl script which forks a number of sub-processes. I'd like to have some kind of functionality like xargs --max-procs=4 --max-args=1 or make -j 4, where Perl will keep a given number of processes running until it runs out of work.

It's easy to say fork four process and wait for them all to complete, and then fork another four, but I'd like to keep four or n processes running at the same time, forking a new process as soon as one completes.

Is there a simple way in Perl to implement such a process pool?

+6  A: 

Check out Parallel::ForkManager -- it does much of what you describe. You can set a maximum number of processes, and the callback function could start a new child as soon as one finishes (as long as there is work to do).

Ether
+6  A: 

Forks::Super can handle this requirement.

use Forks::Super MAX_PROC => 5, ON_BUSY => [ block | queue ];

Calls to fork() can block until the number of active subprocesses falls below 5, or you can pass additional parameters to the fork call and the tasks to perform can queue up:

fork { sub => sub { ... task to run in subprocess ... } }

When one subprocess finishes, another job on the queue will start up.

(I am the author of this module).

mobrule
+1  A: 

While I would almost always use a CPAN module, or write something with the fantastic AnyEvent modules I think its important to understand how these things work under the hood. Here's an example that has no dependencies other than perl. The same approach could also be written in C without too much trouble.

#!/usr/bin/env perl

use strict;

## run a function in a forked process
sub background (&) {
  my $code = shift;

  my $pid = fork;
  if ($pid) {
    return $pid;
  } elsif ($pid == 0) {
    $code->();
    exit;
  } else{
    die "cant fork: $!"
  }
}

my @work = ('sleep 30') x 8;
my %pids = ();
for (1..4) {
  my $w = shift @work;
  my $pid = background {
    exec $w;
  };
  $pids{$pid} = $w; 
}

while (my $pid = waitpid(-1,0)) {
  if ($?) {
    if ($? & 127) {
      warn "child died with signal " . ($? & 127);
    } else {
      warn "chiled exited with value " . ($? >> 8);
    }

    ## redo work that died or got killed
    my $npid = background {
      exec $pids{$pid};
    };
    $pids{$npid} = delete $pids{$pid};
  } else {
    delete $pids{$pid};

    ## send more work if there is any
    if (my $w = shift @work) {
      my $pid = background {
        exec shift @work;
      };
      $pids{$pid} = $w;
    }
  }
}
eclark