如何使用嵌入的kafka库测试kafka消费者,确切地说是使用'withrunningkafka'?

gg58donl  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(370)

我应该测试我的代码,通过嵌入的'withrunningkafka'使用kafka服务器的所有消息,如下所示:https://github.com/manub/scalatest-embedded-kafka
我尝试通过创建的嵌入式生产者向主题发送消息。
我尝试通过我在项目中的代码来使用生成的消息(由嵌入式生产者创建)。
“与定制生产商和消费者进行测试”应{

"work" in {

    withRunningKafka {

      1. val producer: KafkaProducer[String, String] =
               aKafkaProducer[String](valueSerializer, config)

         val topic = "topic-to-test"

         producer.send(new ProducerRecord[String, String](topic, "some message 1"))
         producer.send(new ProducerRecord[String, String](topic, "some message 2"))
         producer.close()

      2. val ok: Future[Done] = Consumer
        .committableSource(
            consumerSettings,
            Subscriptions.topics(topic))
        .map(msg => println(msg.record.value()))
        .runWith(Sink.ignore)

       ok should be (Done)
    }
}}

问题就在这里:“ok”不会给出“done”的结果。一般来说,我测试消费者的逻辑正确吗?

ws51t4hk

ws51t4hk1#

我认为你同时面临两个问题:
kafka消费者无限地等待元素(正如@dvim所说的),所以您需要.take()才能让它真正结束
默认情况下,kafka使用者组将从当前主题的末尾开始,而不是从开头开始,因此不会使用在主题旋转之前发布的消息。你需要一个设置,使它从主题的开始而不是结束。

6ioyuze2

6ioyuze22#

欢迎来到stackoverflow!
原因是什么 ok 从未完成并产生结果,因为源正在等待可能的进一步消息。添加 .take(2) 在Map之前,源将在两个元素之后停止 ok 未来将完成。

相关问题