
kzmpq1sx  于 2023-03-09  发布在  Perl

我最初的www.example.com解决方案将数据副本保存在磁盘上。 saves a copy of the data on disk.

window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: *


window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: {1..99}

我尝试使用open '+<'。这解决了上述问题,但现在它不会在EOF处停止。


use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use Fcntl qw(:DEFAULT :flock);

for (@ARGV) {
    open($fh{$_},"<",$_) || die;
    # set fh non blocking($fh{$_});
    my $flags;
    fcntl($fh{$_}, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh{$_}, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle

while(keys %fh) {
    for(keys %fh) {
        my($string,$something_read) = non_blocking_read($_);
    print $string;
    # Sleep 1 ms
    select(undef, undef, undef, 1/1000);

    my %buffer;

    sub non_blocking_read {

        my $file = shift;
        my $in = $fh{$file};
        my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680);
        if (!$rv) {
            if($! == EAGAIN) {
            # Would block: Nothing read
            } else {
            # This file is done
            close $in;
            delete $fh{$file};
            my $buf = $buffer{$file};
            delete $buffer{$file};
            return ($buf,1);

        # Find \n for full line
        my $i = (rindex($buffer{$file},"\n")+1);
        if($i) {
            # Return full line
            # Remove full line from $buffer
               1,substr($buffer{$file},0,$i) = "");
        } else {
            # Something read, but not a full line



use Symbol qw(gensym);
use IPC::Open3;

for (@ARGV) {
    open($fh{$_},"<",$_) || die;

$ms = 1;
while(keys %fh) {
    for(keys %fh) {
    my($string,$something_read) = non_blocking_read($_);
    if($something_read) {
        $ms = 0.1;
        print $string;
    $ms = exp_usleep($ms);

    my %buffer;
    my $ms;

    sub non_blocking_read {
    use POSIX qw(:errno_h);

    my $file = shift;
    my $in = $fh{$file};
    my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680);
    if (!$rv) {
        if($! == EAGAIN) {
        # Would block: Nothing read
        } else {
        # This file is done
        close $in;
        delete $fh{$file};
        my $buf = $buffer{$file};
        delete $buffer{$file};
        return ($buf,1);

    #### Well-tested code below                                                           

    # Find \n or \r for full line
    my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
    if($i) {
        # Return full line
        # Remove full line from $buffer
           1,substr($buffer{$file},0,$i) = "");
    } else {
        # Something read, but not a full line

sub rindex64 {
    # Do rindex on strings > 2GB.
    # rindex in Perl < v5.22 does not work for > 2GB
    # Input:
    #   as rindex except STR which must be passed as a reference
    # Output:
    #   as rindex
    my $ref = shift;
    my $match = shift;
    my $pos = shift;
    my $block_size = 2**31-1;
    my $strlen = length($$ref);
    # Default: search from end
    $pos = defined $pos ? $pos : $strlen;
    # No point in doing extra work if we don't need to.
    if($strlen < $block_size) {
    return rindex($$ref, $match, $pos);

    my $matchlen = length($match);
    my $ret;
    my $offset = $pos - $block_size + $matchlen;
    if($offset < 0) {
    # The offset is less than a $block_size
    # Set the $offset to 0 and
    # Adjust block_size accordingly
    $block_size = $block_size + $offset;
    $offset = 0;
    while($offset >= 0) {
    $ret = rindex(
        substr($$ref, $offset, $block_size),
    if($ret != -1) {
        return $ret + $offset;
    $offset -= ($block_size - $matchlen - 1);
    return -1;

sub exp_usleep {
    # Sleep this many milliseconds.
    # Input:
    #   $ms = milliseconds to sleep
    # Returns:
    #   $ms + 10%
    my $ms = shift;
    select(undef, undef, undef, $ms/1000);
    return (($ms < 1000) ? ($ms * 1.1) : ($ms));

sub set_fh_non_blocking {
    # Set filehandle as non-blocking
    # Inputs:
    #   $fh = filehandle to be blocking
    # Returns:
    #   N/A
    my $fh = shift;
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
    my $flags;
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle



mkfifo {1..100}
parcat {1..100} &
parallel -j2 echo works '>' {} ::: {1..100}

parcat {1..100} &
# Fails (parcat does not exit)
parallel -j2 cat /dev/null '>' {} ::: {1..100}


use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use IO::Select;
use strict;

my $s = IO::Select->new();
my %fhr;
my %fhw;

for (@ARGV) {
    # Open the file with a fake writer that will never write
    open(my $fhw,"+<",$_) || die;
    # Open the file for real
    open(my $fhr,"<",$_) || die;

my %buffer;
while(keys %fhr) {
    for my $file ($s->can_read(undef)) {
        my $rv = sysread($file, substr($buffer{$file},length $buffer{$file}), 327680);
        if (!$rv) {
            if($! == EAGAIN) {
            # Would block: Nothing read
            } else {
            # This file is done
            delete $fhr{$file};
        print $buffer{$file};
            delete $buffer{$file};
        # Closing the $file causes it to block
        # close $file;
    if($fhw{$file}) {
        # We have received data from $file:
        # Close the fake writer
        close $fhw{$file};
        delete $fhw{$file};
        # Find \n or \r for full line
        my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
        if($i) {
            # Print full line
            # Remove full line from $buffer
        print substr($buffer{$file},0,$i);
        substr($buffer{$file},0,$i) = "";
        } else {
            # Something read, but not a full line

sub rindex64 {
    # Do rindex on strings > 2GB.
    # rindex in Perl < v5.22 does not work for > 2GB
    # Input:
    #   as rindex except STR which must be passed as a reference
    # Output:
    #   as rindex
    my $ref = shift;
    my $match = shift;
    my $pos = shift;
    my $block_size = 2**31-1;
    my $strlen = length($$ref);
    # Default: search from end
    $pos = defined $pos ? $pos : $strlen;
    # No point in doing extra work if we don't need to.
    if($strlen < $block_size) {
        return rindex($$ref, $match, $pos);
    my $matchlen = length($match);
    my $ret;
    my $offset = $pos - $block_size + $matchlen;
    if($offset < 0) {
        # The offset is less than a $block_size
        # Set the $offset to 0 and
        # Adjust block_size accordingly
        $block_size = $block_size + $offset;
        $offset = 0;
    while($offset >= 0) {
        $ret = rindex(
            substr($$ref, $offset, $block_size),
        if($ret != -1) {
            return $ret + $offset;
        $offset -= ($block_size - $matchlen - 1);
    return -1;

sub set_fh_non_blocking {
    # Set filehandle as non-blocking
    # Inputs:
    #   $fh = filehandle to be blocking
    # Returns:
    #   N/A
    my $fh = shift;
    $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
    my $flags;
    fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
    $flags |= &O_NONBLOCK; # Add non-blocking to the flags
    fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle

它的一个更好的版本现在作为parcat随GNU Parallel一起分发。
