views:

252

answers:

4

Recently I had a problem using (pipe |-) when I wanted to communicate between two processes. Basically, the child process couldn't process STDIN as fast as it was filled up by parent. This caused parent to wait until STDIN was free and made it run slow.

How big can STDIN be and is it possible to modify it. If yes, what is the best practice size?

Here is some code sample to show what I mean:

if ($child_pid = open($child, "|-"))
{
    $child->autoflush(1);

    # PARENT process
    while (1)
    {

             # Read packet from socket save in $packet
             process_packet($packet);

             # forward packet to child
             print $child $packet;
     }
}
else
{
     die "Cannot fork: $!" unless defined $child_pid;
     # CHILD process
     my $line;  

     while($line = <STDIN>)
     {
         chomp $line;
         another_process_packet($line);
     }
}

In this sample another_process_packet slower than process_packet. The reason I write the code like this is, I want to use same data comes from socket and actually get it once.

Thanks in advance.

+1  A: 

The size is set in the kernel. You can either recompile the kernel with a higher limit or use an intermediary buffer process.

robertc
+5  A: 

You can of course buffer in the parent process, and only write to the child when the child's fd is writable (i.e., writing won't block). You can do this yourself with the right args to syswrite, or use an event loop:

use AnyEvent;
use AnyEvent::Handle;

# make child, assume you write to it via $fh

my $done = AnyEvent->condvar;
my $h = AnyEvent::Handle->new( fh => $fh );

while( you do stuff ){
    my $data = ...;
    $h->push_write($data); # this will never block
}

$h->on_drain(sub { $done->send });
$done->wait; # now you block, waiting for all writes to actually complete

Edit: This used to be untested, but I tested it, and it works. (I used perl -ne "sleep 1; print $_" as the slow child.) Writes proceed during the while loop, if possible, but never block the loop. At the end, you actually block until all the writes have completed.

My test scripts are on gist.github: http://gist.github.com/126488

You can see how the child blocks the blocking loop, but how it doesn't block the non-blocking loop. Obvious when you put it that way ;)

(Finally, as a general rule of thumb; if you are interacting with the network or with other processes, you should probably be using an event loop.)

jrockway
Thanks BTW, but I don't want to block parent for any reason. Probably I need to read more about AnyEvent. Thanks.
Omid
Well, don't block then. I only block at the end because the process is about to exit, and if you exit with unwritten data, the child will never see it.If this is a long-running process, you won't need to block.
jrockway
Well, it seems AnyEvent::Handle works pretty good, but I still have some problem. After I stop the child for a while to make STDIN buffer full, parent pushes the data into wbuf and that's what it suppose to do but the thing is after I release child, then child only process those data it was saved in STDIN buffer and never touch wbuf. Then to force wbuf to push its data into STDIN, I use on_drain(sub {$done->send}) and a line after I use $done->recv to make parent wait, but during this moment I lose some data as they come from socket.Any idea how I can handle this issue???
Omid
A: 

Process handle contains a member function named 'blocking'. Just set the blocking to 0, and the parent process will not be blocked.

if ($child_pid = open($child, "|-"))
{
    $child->blocking(0);    # Key to the solution.
    $child->autoflush(1);

    # PARENT process
    while (1)
    {

             # Read packet from socket save in $packet
             process_packet($packet);

             # forward packet to child
             print $child $packet;
     }
}
else
{
     die "Cannot fork: $!" unless defined $child_pid;
     # CHILD process
     my $line;  

     while($line = <STDIN>)
     {
         chomp $line;
         another_process_packet($line);
     }
}
Yan Cheng CHEOK
Thanks Cheok :)
Omid
Child doesn't work properly.
Omid
A: 

Well after a lot of testing I figured out that I need to write another process that independently control my buffer with AnyEvent::Handle (Thanks jrockway).

In this way child can't stop it for any reason.

This is how I implement it:

# Parent code and creating buffer process as well ...
# ...

else
{
    ############################################################################
    # FIFO BUFFER PROCESS
    ############################################################################
    die "Cannot fork: $!" unless defined $buffer_pid;

    my $cond_var = AnyEvent->condvar;

    my $parent_stdin_handle;
    $parent_stdin_handle = AnyEvent::Handle->new(
     fh => \*STDIN,
     on_eof => sub { 
      $cond_var->send;
     },
    );

    my $db_child_pid;
    my $db_child;

    if ($db_child_pid = open($db_child, "|-"))
    {
     my $db_child_handle;
     $db_child_handle = AnyEvent::Handle->new( 
      fh => $db_child,
      on_eof => sub {
       undef $db_child_handle;
      },
     );

     $parent_stdin_handle->on_read( sub {
      my $handle = shift;

      # Read a line
      $handle->unshift_read( 
       line => sub {
        my ($handle, $line) = @_;
        $db_child_handle->push_write($line."\n");
       }
      );
     });

     ####################################################################
     # DESTROY SEQUENCE
     # 
     # The sequence below shouldn't be changed, unless parent can't exit 
     # properly and even hanged as it tied with its children. (Quite 
     # responsible ;) )
     ####################################################################
     $cond_var->recv;
     # Closing db_child
     close $db_child;
     # Destroy parent_stdin_handle
     undef $parent_stdin_handle;
     ####################################################################
     # DESTROY SEQUENCE DONE
     ####################################################################

    }
    else 
    {
     # Child process get data from STDIN and if it stock for some reason
     # it won't stop buffer neither parent 
    }

}

Like to here comments ...

Omid