我有一个阿克卡流Kafka的来源,是从Kafka的主题阅读。
我有一个简单的任务,允许禁用消息偏移量的提交。提交通常通过调用commitscaladsl来完成。
我的问题是我不知道如何测试是否提交了偏移量。
我们通常使用embeddedkafka进行测试,但我还没有找到一种方法来请求最后提交的偏移量。
这是我写的测试的一个例子:
"KafkaSource" should {
"consume from a kafka topic and pass the message " in {
val commitToKafka = true
val key = "key".getBytes
val message = "message".getBytes
withRunningKafka {
val source = getKafkaSource(commitToKafka)
val (_, sub) = source
.toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
.run()
val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
messageOpt should not be empty
messageOpt.get.value shouldBe message
}
}
现在我想添加一个检查是否提交了偏移量。
2条答案
按热度按时间shstlldc1#
Kafka按主题名和分区ID存储偏移量。所以你可以用
.committed()
或者.position
方法来检查kafka使用者的最后提交的偏移量或当前位置。committed():获取给定分区的上一次提交的偏移量(无论提交是由这个进程还是另一个进程进行的)。
position():获取要获取的下一条记录的偏移量(如果存在具有该偏移量的记录)。
wribegjk2#
我最终用一个consumerinterceptor解决了这个问题,定义如下:
oncommit在提交完成时被调用,在本例中,我只记录它。我使用configure方法在每个测试开始时都有空记录。
然后,在为源创建使用者设置时,我将拦截器添加为属性: