如何测试偏移量是否已提交给kafka

cgh8pdjw  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(424)

我有一个阿克卡流Kafka的来源,是从Kafka的主题阅读。
我有一个简单的任务,允许禁用消息偏移量的提交。提交通常通过调用commitscaladsl来完成。
我的问题是我不知道如何测试是否提交了偏移量。
我们通常使用embeddedkafka进行测试,但我还没有找到一种方法来请求最后提交的偏移量。
这是我写的测试的一个例子:

"KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes

      withRunningKafka {

        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()

        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }

现在我想添加一个检查是否提交了偏移量。

shstlldc

shstlldc1#

Kafka按主题名和分区ID存储偏移量。所以你可以用 .committed() 或者 .position 方法来检查kafka使用者的最后提交的偏移量或当前位置。
committed():获取给定分区的上一次提交的偏移量(无论提交是由这个进程还是另一个进程进行的)。
position():获取要获取的下一条记录的偏移量(如果存在具有该偏移量的记录)。

wribegjk

wribegjk2#

我最终用一个consumerinterceptor解决了这个问题,定义如下:

class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
  override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records

  override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
    import scala.collection.JavaConverters._
    OffsetRecorder.add(offsets.asScala)
  }

  override def close(): Unit = {}

  override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear

}

oncommit在提交完成时被调用,在本例中,我只记录它。我使用configure方法在每个测试开始时都有空记录。
然后,在为源创建使用者设置时,我将拦截器添加为属性:

ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
    .withBootstrapServers(s"localhost:${kafkaPort}")
    .withGroupId("group-id")
    .withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")

相关问题