kafkaconsumer.commitasync()行为,其偏移量低于上一个

bnlyeluc  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(344)

Kafka将如何处理一个电话 KafkaConsumer.commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) 当一个主题的偏移量值比前一个调用的值小时?

oprakyz7

oprakyz71#

我很好奇并测试了它的行为。正如文件中所写,@haoyuwang在他的回答中所写的是正确的(+1)。
背后的原因很简单。使用者组的提交偏移量存储在内部主题的kafka中 __consumer_offsets . 这个主题是 compact 这意味着它要为给定的键提供最新的值。在您的例子中,键是使用者组、主题和分区的组合,而您的值是偏移量。
如果你现在
提交偏移量10,由于稍后的异步进程
提交偏移量5
偏移量5将是 __consumer_offsets 主题。这意味着消费者将从该主题分区读取的下一个偏移量是偏移量6,而不是偏移量11。

如何繁殖

您可以通过(同步地)在常规提交之后提交一个较早的偏移量来复制并测试它,如下所示:

consumer.commitSync();
consumer.commitSync(commitFirstMessage);

哪里 commitFirstMessage 定义为

TopicPartition zeroTopicPartition = new TopicPartition(topic, 0);
OffsetAndMetadata zeroOffset = new OffsetAndMetadata(0L);

Map<TopicPartition, OffsetAndMetadata> commitFirstMessage = new HashMap<>();
commitFirstMessage.put(zeroTopicPartition, zeroOffset);

编辑:

如何避免使用commitasync提交较低的偏移量

在《Kafka-权威指南》一书中,有一条建议,即避免由于重试调用而提交较低的偏移量 commitAsync :
重试异步提交:为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到commitasync回调。准备发送重试时,检查回调得到的提交序列号是否等于示例变量;如果是,则没有新的提交,可以安全地重试。如果示例序列号较高,则不要重试,因为已发送较新的提交。
一个实现可能看起来像这样(实际上没有测试过!):

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }

  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}
omhiaaxx

omhiaaxx2#

它只需将分区的偏移量设置为您指定的值,因此下次您将使用来自committedfost+1的消息。
的javadoc commitAsync() 说:
提交的偏移量应该是应用程序将使用的下一条消息,即lastprocessedmessageoffset+1。

相关问题