views:

436

answers:

2

Basically I would like to:

  1. Read a large amount of data from the network into an array into memory.
  2. Asynchronously write this array data, running it thru bzip2 before it hits the disk.

repeat..

Is this possible? If this is possible, I know that I will have to somehow read the next pass of data into a different array as the AIO docs say that this array must not be altered before the async write is complete. I would like to background all of my writes to disk in order as the bzip2 pass is going to take much longer than the network read.

Is this doable? Below is a simple example of what I think is needed, but this just reads a file into array @a for testing.

use warnings;
use strict;
use EV;
use IO::AIO;
use Compress::Bzip2;
use FileHandle;
use Fcntl;


my @a;

print "loading to array...\n";
while(<>) {
  $a[$. - 1] = $_;
}
print "array loaded...\n";


my $aio_w = EV::io IO::AIO::poll_fileno, EV::WRITE, \&IO::AIO::poll_cb;


aio_open "./out", O_WRONLY || O_NONBLOCK, 0, sub {
  my $fh = shift or die "error while opening: $!\n";

  aio_write $fh, undef, undef, $a, -1, sub {
    $_[0] > 0 or die "error: $!\n";
    EV::unloop;
  };
};

EV::loop EV::LOOP_NONBLOCK;
A: 

You may be interested in how Perlbal handles operations like this. I believe it uses Danga::Socket to accomplish something very similar to what you want to do.

Nick Gerakines
+1  A: 

Asynchronously write this array data

FYI, write()s are pretty much always asynchronous. Unless of course you fill up the OS write cache.

You would gain very little from using the AIO compared to starting a plain pipe, e.g., untested:

my $socket; # INET something
my $out = new IO::Handle;
open($out, "|bzip2 > ./out") || die;
while (1) {
  my $buf;
  $socket->recv($buf, 64*1024, 0);
  last unless defined $buf and length $buf;
  print $out $buf;
}
close($out);

Under most OSs it is very hard to generate such amounts of information as to fill up the write cache. Least with having bzip2 in pipe-line: HDDs throughput is much higher (>50MB/s) than compression performance (in range of megabytes per second).

If you want to run it background or have several streams in parallel, have no fear to fork() and use exit() from child to signal main program how the operation went on.

To my knowledge the most useful (and probably only useful) aspect of AIO is the asynchronous reads. That can not be achieved in any other way. Using AIO to only async write makes very little sense.

Dummy00001