Akka Streams异步运行流

wkftcu5l  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(154)

我已经测试了简单的异步流,如果它异步运行,我很惊讶,它不是。我需要一些额外的配置?

@Configuration
class StreamingConfiguration
{

    @Bean
    Materializer materializer(ActorSystem actorSystem)
    {
        return ActorMaterializer.create(actorSystem);
    }

    @PostConstruct
    public void test(Materializer materializer)
    {
        var takePart = Flow.of(String.class).map(path -> {
            var start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start < 3000) {}
            return path;
        });

        Source.from(Lists.newArrayList("A", "B", "C", "D"))
            .via(takePart.async())
            .toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
            .run(materializer)
            .toCompletableFuture()
            .join();
    }
}

我可以看到物化程序具有默认fork-join-pool调度程序
编辑:很抱歉,你的例子也不起作用。使用mapAsync时,仍然需要12~秒才能完成。我尝试了flatMapMerge,得到了同样的结果:/

Function<String, CompletionStage<String>> blocking = s -> {
            try
            {
                Thread.sleep(3000);

            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            return CompletableFuture.completedFuture(s);
        };

        Source.from(List.of("A", "B", "C", "D"))
                .mapAsync(4, blocking)
                .toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
                .run(actorSystem)
                .toCompletableFuture()
                .join();
6za6bjd0

6za6bjd01#

默认情况下,Akka Streams将流阶段具体化为单个演员:这避免了在流阶段之间传递消息的开销,但是这确实意味着在第一个元素已经通过流之前,流的第二个元素不会被消耗。
流中的async运算符意味着流将在其自己的参与者中执行。在示例代码中:

  • Source将成为参与者
  • takePart流将作为参与者
  • Sink将成为参与者

其中的每一个仍然不允许一次处理多个元素:与没有async相比,SourceSink可以在takePart有一个元素在处理的同时有一个元素在处理。在下游阶段还有一个小的隐式缓冲区,以提高吞吐量,但这通常可以忽略。
在这个流中,takePart阶段需要3秒来处理一个元素,SourceSink需要几微秒(为了便于说明,我们假设Source需要5微秒,Sink需要15微秒)。因此,大致的时间顺序是(忽略缓冲区):

  • 定时器0:takePartSource发出请求信号
  • 时间5 us:SourcetakePart发射A
  • 时间3秒+5微秒:takePartSink发射A,向Source发出需求信号
  • 时间3秒+10微秒:Source将B发射到takePart
  • 时间3秒+20微秒:Sink进程A,向takePart发出请求信号
  • 时间6秒+10微秒:takePartSink发射B,向Source发出需求信号
  • 时间6秒+15微秒:Source将C发射到takePart
  • 时间6秒+25微秒:Sink进程B,向takePart发出请求信号
  • 时间9秒+15微秒:takePartSink发射C,向Source发出需求信号
  • 时间9秒+20微秒:Source发射D至takePart
  • 时间9秒+30微秒:Sink处理C,向takePart发出请求信号
  • 时间12秒+20微秒:takePartSink发出D,向Source发出需求信号,Source完成,takePart完成
  • 时间12秒+35微秒:Sink进程D,完成

如果没有async,流将在4 * (3 sec + 20 us)中完成,因此async节省了45 us(累积起来,该流中的async将为第一个元素之后的每个元素节省15 us),因此没有太多的增益。如果交通量大到足以使高速公路饱和,则速度限制下降之前的高速公路上的速度将是速度限制下降之后的速度限制):如果async的每一端都以大致相同的速率处理元素,则可以获得最佳结果。
在Akka Streams API中,“async”还有另一种用法,这有点令人困惑,它用来表示通过获取Future s(Scala)或CompletionStage s(Java)与异步进程通信的阶段:完成X1 M51 N1 X/X1 M52 N1 X的进程可以在不同的线程上运行,并且流阶段通常包括对它将允许一次运行的X1 M53 N1 X/X1 M54 N1 X的数量的某种限制。X1 M55 N1 X是这种情况的一个示例。
在Scala中(我通常不熟悉Java未来的API),这将类似于(忽略设置隐式ExecutionContext等):

def blockOnElement(e: String): Future[String] = Future {
  Thread.sleep(3000)
  e
}

Source(List("A", "B", "C", "D"))
  .mapAsync(4)(blockOnElement)
  .runWith(Sink.fold("") { (acc, _) => acc })

这样,假设调度程序中有足够的(多于4个)线程,则整个流应该在大约3秒和80 us内完成(假设上述延迟为5/15 us)(SourceSink仍合并在每个元素上花费20 us)。
除了@Alec提到的flatMapMerge之外,在mapAsync中运行一个子流,使用Source.singleSink.head也是很有用的:宿的具体化值将是输出元素的x1M63 N1 x/x1M64 N1 x,并且x1M65 N1 x将依次保持下游排序(与x1M66 N1 x相反)。

相关问题