我最初的www.example.com解决方案将数据副本保存在磁盘上。https://superuser.com/questions/482953/read-non-blocking-from-multiple-fifos-in-parallel?answertab=oldest#tab-top saves a copy of the data on disk.
现在我已经创建了第二个版本,在内存中缓冲一行。
它可以工作,但需要在启动前连接所有FIFO。
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: *
这不会给出任何输出(因为100未连接):
window1$ mkfifo {1..100}
window1$ parcat {1..100} | pv >/dev/null
window2$ parallel -j0 'cat bigfile > ' ::: {1..99}
我尝试使用open '+<'
。这解决了上述问题,但现在它不会在EOF处停止。
我该怎么做呢?
最小版本(不支持大行且不回退等待):
#!/usr/bin/perl
use Symbol qw(gensym);
use IPC::Open3;
use POSIX qw(:errno_h);
use Fcntl qw(:DEFAULT :flock);
for (@ARGV) {
open($fh{$_},"<",$_) || die;
# set fh non blocking($fh{$_});
my $flags;
fcntl($fh{$_}, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh{$_}, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
print $string;
}
# Sleep 1 ms
select(undef, undef, undef, 1/1000);
}
{
my %buffer;
sub non_blocking_read {
my $file = shift;
my $in = $fh{$file};
my $rv = sysread($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
return(undef,undef);
} else {
# This file is done
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
# Find \n for full line
my $i = (rindex($buffer{$file},"\n")+1);
if($i) {
# Return full line
# Remove full line from $buffer
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
# Something read, but not a full line
return("",1);
}
}
}
完整版本:重要的代码在前40行:剩下的是经过良好测试的代码。
#!/usr/bin/perl
use Symbol qw(gensym);
use IPC::Open3;
for (@ARGV) {
open($fh{$_},"<",$_) || die;
set_fh_non_blocking($fh{$_});
}
$ms = 1;
while(keys %fh) {
for(keys %fh) {
my($string,$something_read) = non_blocking_read($_);
if($something_read) {
$ms = 0.1;
print $string;
}
}
$ms = exp_usleep($ms);
}
{
my %buffer;
my $ms;
sub non_blocking_read {
use POSIX qw(:errno_h);
my $file = shift;
my $in = $fh{$file};
my $rv = read($in, substr($buffer{$file},length $buffer{$file}), 327680);
if (!$rv) {
if($! == EAGAIN) {
# Would block: Nothing read
return(undef,undef);
} else {
# This file is done
close $in;
delete $fh{$file};
my $buf = $buffer{$file};
delete $buffer{$file};
return ($buf,1);
}
}
#### Well-tested code below
# Find \n or \r for full line
my $i = (::rindex64(\$buffer{$file},"\n")+1) ||
(::rindex64(\$buffer{$file},"\r")+1);
if($i) {
# Return full line
# Remove full line from $buffer
return(substr($buffer{$file},0,$i),
1,substr($buffer{$file},0,$i) = "");
} else {
# Something read, but not a full line
return("",1);
}
}
}
sub rindex64 {
# Do rindex on strings > 2GB.
# rindex in Perl < v5.22 does not work for > 2GB
# Input:
# as rindex except STR which must be passed as a reference
# Output:
# as rindex
my $ref = shift;
my $match = shift;
my $pos = shift;
my $block_size = 2**31-1;
my $strlen = length($$ref);
# Default: search from end
$pos = defined $pos ? $pos : $strlen;
# No point in doing extra work if we don't need to.
if($strlen < $block_size) {
return rindex($$ref, $match, $pos);
}
my $matchlen = length($match);
my $ret;
my $offset = $pos - $block_size + $matchlen;
if($offset < 0) {
# The offset is less than a $block_size
# Set the $offset to 0 and
# Adjust block_size accordingly
$block_size = $block_size + $offset;
$offset = 0;
}
while($offset >= 0) {
$ret = rindex(
substr($$ref, $offset, $block_size),
$match);
if($ret != -1) {
return $ret + $offset;
}
$offset -= ($block_size - $matchlen - 1);
}
return -1;
}
sub exp_usleep {
# Sleep this many milliseconds.
# Input:
# $ms = milliseconds to sleep
# Returns:
# $ms + 10%
my $ms = shift;
select(undef, undef, undef, $ms/1000);
return (($ms < 1000) ? ($ms * 1.1) : ($ms));
}
sub set_fh_non_blocking {
# Set filehandle as non-blocking
# Inputs:
# $fh = filehandle to be blocking
# Returns:
# N/A
my $fh = shift;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $flags;
fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= &O_NONBLOCK; # Add non-blocking to the flags
fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
}
1条答案
按热度按时间2wnc66cl1#
这个解决方案打开一个伪写入器,一旦接收到任何数据,这个伪写入器就会关闭。除了在输入为空时不会结束之外,它做的事情是正确的:
代码:
它的一个更好的版本现在作为
parcat
随GNU Parallel一起分发。