React性akka流:如何延迟图形关闭,直到源干涸?

iyr7buue  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(350)

我试图从一个kafka代理向akka流图提供数据(比如10个字符串)
下面是unittest中的图表:

val consumer = AlpakkaConsumer(KafkaBootstrapServer(kafkaURL).value, "porteur-id")

val drainingControl =
  Consumer
    .plainSource(consumer.kafkaConsumerSettings, Subscriptions.topics("transaction"))
    .toMat(Sink.seq)(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()
Await.result(streamComplete, Duration.Inf).size should be > 0

unittest失败(大小等于0,应为10)
但是,如果我插入 Thread.sleep(5000) 在调用drainandshutdown()之前,它会通过。
它给我的印象是,在没有睡眠的情况下,在run()调用之后,图形是直接关闭的,它甚至没有时间处理第一条消息。如果我引入一个sleep命令,它可以遍历所有源代码内容,关闭图形,最后我得到了一些东西。
我不知道我做错了什么,因为这基本上是一个片段,你可以在任何地方找到,并作为一个例子
仅当源已传递所有消息时,如何触发drainandshutdown调用?
谢谢你的帮助!!
注意:我使用的是akka stream kafka 0.22版本

jum4pzuy

jum4pzuy1#

根据Kafka的文件, drainAndShutdown 行为是:
停止从源生成消息,等待流完成并关闭使用者源,以便所有消耗的消息到达流的末尾。
这意味着您将停止在消费者中接收消息,并且它将只处理已经排队的消息。
对于测试流,我建议看一下streams testkit,因为它可以让您控制要测试的流的哪些部分

相关问题