perl 将文件读入数据块并并行处理数据块

9rygscc1  于 2022-11-15  发布在  Perl
关注(0)|答案(1)|浏览(204)

因此,基本上我想将文件读入哈希,但由于文件很大,无法放入RAM,因此我将其拆分为块,处理数据(search_f2子函数)并读取下一个数据块。这似乎可以工作,但当然它只需要一个内核。有没有简单的方法来派生search_f2 sub?我已经尝试了一个简单的方法来处理Parallel::Forkmanager,但它没有。就我所见,它不起作用。有什么提示吗?我实际上不需要从forked sub返回,如果它将结果打印到STDOUT就足够了。file1的结构如下(基本上是tar -tf命令的结果):

tarfile1.tar
gzip1.gz
<skip>
gzipX.gz
<skip>
tarfileX.tar
<some random number of gz files>

file2只是gzipX.gz文件的纯换行符分隔列表
Perl代码:

#!/usr/bin/perl
use strict;
use warnings;
use feature qw(say);
use Data::Dumper;
use Parallel::ForkManager;

my $file1 = $ARGV[0] // die "Need a file as argument";
my $file2 = $ARGV[1] // die "Need a file as argument";

my $fd1 = read_f($file1);
my %hdata;
my $tarfile;
my $index = 0;

my $pm = Parallel::ForkManager->new(10);
while (my $line = <$fd1>) {
    chomp $line;
    if ( $line =~ m/^somepattern.*tar$/ ){
        $tarfile = $line;
        $index++;
    }
    if (++$index >= '100') {
        say "\tForking a process";
        my $pid = $pm->start and do {
            $index = 0;
            %hdata = ();
            next;
        };
        search_f2(\%hdata);
        $pm->finish;
    }
    push @{$hdata{$tarfile}},$line if $line =~ m/.*\.gz$/;
}
close $fd1;

#last search
search_f2(\%hdata);

sub search_f2{
    my ($h) = @_;
    my %hdata = %$h;
    my $fd2 = read_f($file2);
    while (my $ciffile  = <$fd2>) {       
        chomp $ciffile;                   
        foreach my $tarfile (keys %hdata) {  
            my $values = $hdata{$tarfile};   
            if (grep (/$ciffile/, @$values)) {
                say "$tarfile";
                delete $hdata{$tarfile};
                last;
            }
        }
    }
    close $fd2;
    return;
}

sub read_f {
    my $file = shift;
    die "Can't open file $file: $!\n" if ! -e $file;
    # gzip happily parses plain files as well
    open my $fh, "pigz -fdc $file|" or die "Can't open file $file: $!\n";
    return $fh if $fh;
}
wmvff8tz

wmvff8tz1#

我的任务如下:从一个文件中读取一定数量的行,并在自己的分支中处理每一个这样的文本块。我不太清楚问题中的一些细节,所以这里是一个基本的演示,希望可以作为模板。
将进程数保持为3,并在每个进程中处理2行的批(“块”)。

use warnings;
use strict;
use feature qw(say state);
use Parallel::ForkManager;

my $file = shift // die "Usage: $0 filename\n";

my $pm = Parallel::ForkManager->new(3);

open my $fh, '<', $file or die $!; 

my ($chunk, $num_lines);

while (my $line = <$fh>) {
    chomp $line;
    say "Processing line: |$line|";

    $chunk .= $line;

    if (++$num_lines >= 2) {
        say "\tForking a process";

        $pm->start and do {
            $num_lines = 0;
            $chunk = ''; 
            next;
        };
        proc_chunk($chunk);
        $pm->finish;
    }   
}
$pm->wait_all_children;

sub proc_chunk {
    my ($chunk) = @_; 
    my $line_nos = join ' ', $chunk =~ /#([0-9]+)/g; 
    say "\t\tin a fork, processing chunk with lines: $line_nos";
    sleep 10; 
    say "\t\t\t... done with fork";
}

P::FM中,$pm->start and next;派生了一个进程,父进程立即跳转到循环的下一次迭代,所以任何需要的变量重置都需要在这里完成,我使用了一个do { ... }块。†
子进程处于休眠状态,这样我们就可以看到一组fork实际上是一起退出的,这是因为这里的处理速度太快了。(事实上,P::FM在一个进程完成后就派生一个新进程,以保持给定的进程数继续运行。它不会等待整个批处理先完成,除非设置了wait_for_available_procs,请参见示例和here的更多详细信息。)
此打印

Processing line: |This is line #1|
Processing line: |This is line #2|
        Forking a process
Processing line: |This is line #3|
Processing line: |This is line #4|
        Forking a process
Processing line: |This is line #5|
Processing line: |This is line #6|
        Forking a process
Processing line: |This is line #7|
Processing line: |This is line #8|
        Forking a process
                in a fork, processing chunk with lines: 1 2
                in a fork, processing chunk with lines: 3 4
                in a fork, processing chunk with lines: 5 6
                        ... done with fork
                        ... done with fork
                        ... done with fork
Processing line: |This is line #9|
Processing line: |This is line #10|
        Forking a process
Processing line: |This is line #11|
Processing line: |This is line #12|
        Forking a process
Processing line: |This is line #13|
Processing line: |This is line #14|
        Forking a process
                in a fork, processing chunk with lines: 7 8
                in a fork, processing chunk with lines: 9 10
                in a fork, processing chunk with lines: 11 12
                        ... done with fork
                        ... done with fork
                        ... done with fork
Processing line: |This is line #15|
Processing line: |This is line #16|
        Forking a process
Processing line: |This is line #17|
Processing line: |This is line #18|
        Forking a process
Processing line: |This is line #19|
                in a fork, processing chunk with lines: 13 14
Processing line: |This is line #20|
        Forking a process
                in a fork, processing chunk with lines: 15 16
                in a fork, processing chunk with lines: 17 18
^C
[...etc...]

†这个问题的原始版本没有do { }块,但有... and next块,因此家长直接退出。(我看到这个问题现在被编辑成包括它。)

相关问题