在独立Perl脚本中使用Mojo::Promise

xfb7svmp  于 2022-11-15  发布在  Perl
关注(0)|答案(2)|浏览(217)

在一个独立的Perl脚本中,我需要并发调用一个外部Web服务(在下面的$number+1中模拟)。
我愿意在Perl中使用更好的方法(队列?)。

#!/usr/bin/env perl

use Mojo::IOLoop;
use Mojo::Promise;
use Future::Utils 'fmap_concat';

####### STASH ######
my $result_future; #
####################

my $count = 100;
my @numbers = 1..$count;

if (@numbers) {
    my $result_f = fmap_concat {
        my $number = shift;
        my $p = Mojo::Promise->new;
        Mojo::IOLoop->subprocess(
            sub {
                #sleep 2;
                return $number+1;
            },
            sub {
                my ($subprocess, $err, @result) = @_;
                return $p->reject($err) if $err;
                $p->resolve(@result);
            });
        return $p->with_roles('Mojo::Promise::Role::Futurify')->futurify;
    } foreach => \@numbers, concurrent => 20;

    $result_f
        ->on_done(
        sub {
            my @values = @_;
            foreach my $response (@values) {
                print STDERR "Response='$response'\n";
            }
        })
        ->on_fail(
        sub {
            my $error = shift;
            print STDERR "# ERROR='$error'\n";
        })
        ->on_ready(
        sub {
            $result_future = undef;
        })
        ;

    # Must keep a reference to Futures until we're done with them.
    $result_future = $result_f;
}
print STDERR "Processing $count numbers\n"
aij0ehis

aij0ehis1#

由于您没有在已经运行的事件循环中使用此代码,因此您需要自己运行事件循环,以便发生任何事情。最简单的方法是等待Futures完成。

$result_f->await;

在这种情况下不需要保留一个单独的$result_future引用,因为这个语句将阻塞直到Futures准备好。它和相关的->get(就是->await,如果成功则返回结果,如果失败则抛出异常)是Futures允许你以顺序方式运行非阻塞代码的主要机制。

5sxhfpxr

5sxhfpxr2#

作为OP中所要求的一种替代方法,还有Promise::Me,一个基于fork的promise。

my $p = Promise::Me->new(sub
{
    # Some code to execute
})->then(sub
{
    my( $resolve, $reject ) = @$_;
    my $res = shift( @_ ); # return value from the code executed above
    # more processing...
    # You can use $resolve->( $some_data ); to resolve or $reject->( $some_exception ); to reject.
})->then(sub
{
    my $more = shift( @_ ); # return value from the previous then
    # more processing...
})->catch(sub
{
    my $exception = shift( @_ ); # error that occured is caught here
})->finally(sub
{
    # final processing
})->then(sub
{
    # A last then may be added after finally
};

作为充分披露,我是Promise::Me的作者。

相关问题