我已经测试了简单的异步流,如果它异步运行,我很惊讶,它不是。我需要一些额外的配置?
@Configuration
class StreamingConfiguration
{
@Bean
Materializer materializer(ActorSystem actorSystem)
{
return ActorMaterializer.create(actorSystem);
}
@PostConstruct
public void test(Materializer materializer)
{
var takePart = Flow.of(String.class).map(path -> {
var start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 3000) {}
return path;
});
Source.from(Lists.newArrayList("A", "B", "C", "D"))
.via(takePart.async())
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(materializer)
.toCompletableFuture()
.join();
}
}
我可以看到物化程序具有默认fork-join-pool调度程序
编辑:很抱歉,你的例子也不起作用。使用mapAsync
时,仍然需要12~秒才能完成。我尝试了flatMapMerge
,得到了同样的结果:/
Function<String, CompletionStage<String>> blocking = s -> {
try
{
Thread.sleep(3000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
return CompletableFuture.completedFuture(s);
};
Source.from(List.of("A", "B", "C", "D"))
.mapAsync(4, blocking)
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(actorSystem)
.toCompletableFuture()
.join();
1条答案
按热度按时间6za6bjd01#
默认情况下,Akka Streams将流阶段具体化为单个演员:这避免了在流阶段之间传递消息的开销,但是这确实意味着在第一个元素已经通过流之前,流的第二个元素不会被消耗。
流中的
async
运算符意味着流将在其自己的参与者中执行。在示例代码中:Source
将成为参与者takePart
流将作为参与者Sink
将成为参与者其中的每一个仍然不允许一次处理多个元素:与没有
async
相比,Source
和Sink
可以在takePart
有一个元素在处理的同时有一个元素在处理。在下游阶段还有一个小的隐式缓冲区,以提高吞吐量,但这通常可以忽略。在这个流中,
takePart
阶段需要3秒来处理一个元素,Source
和Sink
需要几微秒(为了便于说明,我们假设Source
需要5微秒,Sink
需要15微秒)。因此,大致的时间顺序是(忽略缓冲区):takePart
向Source
发出请求信号Source
向takePart
发射AtakePart
向Sink
发射A,向Source
发出需求信号Source
将B发射到takePart
Sink
进程A,向takePart
发出请求信号takePart
向Sink
发射B,向Source
发出需求信号Source
将C发射到takePart
Sink
进程B,向takePart
发出请求信号takePart
向Sink
发射C,向Source
发出需求信号Source
发射D至takePart
Sink
处理C,向takePart
发出请求信号takePart
向Sink
发出D,向Source
发出需求信号,Source
完成,takePart
完成Sink
进程D,完成如果没有
async
,流将在4 * (3 sec + 20 us)
中完成,因此async
节省了45 us(累积起来,该流中的async
将为第一个元素之后的每个元素节省15 us),因此没有太多的增益。如果交通量大到足以使高速公路饱和,则速度限制下降之前的高速公路上的速度将是速度限制下降之后的速度限制):如果async
的每一端都以大致相同的速率处理元素,则可以获得最佳结果。在Akka Streams API中,“async”还有另一种用法,这有点令人困惑,它用来表示通过获取
Future
s(Scala)或CompletionStage
s(Java)与异步进程通信的阶段:完成X1 M51 N1 X/X1 M52 N1 X的进程可以在不同的线程上运行,并且流阶段通常包括对它将允许一次运行的X1 M53 N1 X/X1 M54 N1 X的数量的某种限制。X1 M55 N1 X是这种情况的一个示例。在Scala中(我通常不熟悉Java未来的API),这将类似于(忽略设置隐式
ExecutionContext
等):这样,假设调度程序中有足够的(多于4个)线程,则整个流应该在大约3秒和80 us内完成(假设上述延迟为5/15 us)(
Source
和Sink
仍合并在每个元素上花费20 us)。除了@Alec提到的
flatMapMerge
之外,在mapAsync
中运行一个子流,使用Source.single
和Sink.head
也是很有用的:宿的具体化值将是输出元素的x1M63 N1 x/x1M64 N1 x,并且x1M65 N1 x将依次保持下游排序(与x1M66 N1 x相反)。