akka streams kafka-消费者单元测试

xj3cbfub  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(428)

我试图通过设置kafka服务器并使用producer发送消息来在本地测试我的代码,但是我想知道是否有一种方法可以为这段代码编写单元测试(测试使用者收到的消息是否正确)。

  1. val consumerSettings = ConsumerSettings(system,
  2. new ByteArrayDeserializer, new StringDeserializer)
  3. .withBootstrapServers("localhost:9092")
  4. .withGroupId("group1")
  5. .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  6. val done = Consumer.committableSource(consumerSettings,
  7. Subscriptions.topics("topic1"))
  8. .map { msg =>
  9. msg.committableOffset.commitScaladsl()
  10. }
  11. .runWith(Sink.ignore)
s4chpxco

s4chpxco1#

您可以使用以下工具测试代码:
缩放试验
scalatest embedded kafka:支持创建使用scalatest的内存中kafka示例。
akka streams测试套件:提供 TestSink.probe 它可以检查并控制流元素(即消息)的需求。
akka-streams-kafka项目在自己的测试中使用了上述方法。看看它的 IntegrationSpec ,您可以根据自己的需要进行调整。

相关问题