kafka consumer offset提交检查,以避免提交较小的偏移

ebdffaop  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(364)

我们假设有一个消费者发送了提交偏移量10的请求。如果有一个沟通问题,经纪人没有得到请求,当然没有回应。之后,我们有另一个消费者处理另一批,并成功地提交抵消20。
问:我想知道是否有一种方法或属性可以处理,这样我们就可以在提交偏移量20之前检查日志中的前一个偏移量是否已提交?

5vf7fwbs

5vf7fwbs1#

您描述的场景只能在使用异步提交时发生。
请记住,一个特定的主题分区只能由同一个consumergroup中的单个使用者使用。如果您有两个消费者在阅读同一主题分区,那么这是唯一可能的
如果他们有不同的消费群体,或者
如果他们拥有相同的消费者群体,就会发生重新平衡。但是,一次只能有一个使用者读取该主题分区,而不会同时读取这两个分区。
案例1非常清楚:如果他们有不同的ConsumerGroup,那么他们并行地、独立地使用分区。此外,他们的补偿是分开管理的。
案例#2:如果第一个使用者由于失败/死亡且未恢复而未能提交偏移量10,则将发生使用者重新平衡,另一个活动使用者将拾取该分区。由于未提交偏移量10,新的使用者将再次开始读取偏移量10,然后跳到下一批并可能提交偏移量20。这导致“至少一次”语义,并可能导致重复。
现在,我们来看一个场景,在提交一个更高的偏移量之后,您可以提交一个更小的偏移量。如开头所说,如果异步提交偏移量(使用 commitAsync ). 设想以下场景,按时间排序:
使用者读取偏移量0(后台线程尝试提交偏移量0)
提交偏移量0成功
使用者读取偏移量1(后台线程尝试提交偏移量1)
提交偏移量1失败,请稍后再试
使用者读取偏移量2(后台线程尝试提交偏移量2)
提交偏移量2成功
现在,该怎么办(你要提交偏移量1吗?)
如果让重试机制再次提交偏移量1,则看起来您的使用者只提交了偏移量1。这是因为最新偏移量par topicpartition上每个消费者组的信息都存储在内部压缩的kafka topic\u consumer\u offsets中,它只存储我们消费者组的最新值(在本例中:offset 1)。
在《Kafka-权威指南》一书中,有一个关于如何缓解这个问题的提示:
重试异步提交:为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到commitasync回调。准备发送重试时,检查回调得到的提交序列号是否等于示例变量;如果是,则没有新的提交,可以安全地重试。如果示例序列号较高,则不要重试,因为已发送较新的提交。
例如,您可以在下面的scala中看到这个想法的实现:

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }

  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}

相关问题