kafka异步提交偏移量复制

dauxcl2d  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(745)

我们偶尔会遇到副本领头节点和其余isr节点之间的高延迟,这会导致使用者出现以下错误:

  1. org.apache.kafka.clients.consumer.RetriableCommitFailedException: Commit offsets failed with retriable exception. You should retry committing offsets.
  2. Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.

我可以增加 offsets.commit.timeout.ms 但我不想,因为这可能会导致额外的副作用。但从更广泛的Angular 来看,我不希望代理等待同步所有其他副本上的提交偏移量,而是本地提交并异步更新其余副本。查看我找到的代理配置 offsets.commit.required.acks 它看起来就是这样配置的,但文档还神秘地指出: the default (-1) should not be overridden .
为什么?我甚至试过查看代理的源代码,但没有发现什么额外的信息。
你知道为什么不推荐这个吗?有没有不同的方法来达到相同的结果?

yyyllmsg

yyyllmsg1#

我建议实际重试提交偏移量。
让使用者异步提交偏移量并实现重试机制。但是,重试异步提交可能会导致在提交较大的偏移量之后再提交较小的偏移量的情况,这应该尽可能避免。
在《Kafka-权威指南》一书中,有一个关于如何缓解这个问题的提示:
重试异步提交:为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到commitasync回调。准备发送重试时,检查回调得到的提交序列号是否等于示例变量;如果是,则没有新的提交,可以安全地重试。如果示例序列号较高,则不要重试,因为已发送较新的提交。
例如,您可以在下面的scala中看到这个想法的实现:

  1. import java.util._
  2. import java.time.Duration
  3. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
  4. import org.apache.kafka.common.{KafkaException, TopicPartition}
  5. import collection.JavaConverters._
  6. object AsyncCommitWithCallback extends App {
  7. // define topic
  8. val topic = "myOutputTopic"
  9. // set properties
  10. val props = new Properties()
  11. props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  12. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  13. // [set more properties...]
  14. // create KafkaConsumer and subscribe
  15. val consumer = new KafkaConsumer[String, String](props)
  16. consumer.subscribe(List(topic).asJavaCollection)
  17. // initialize global counter
  18. val atomicLong = new AtomicLong(0)
  19. // consume message
  20. try {
  21. while(true) {
  22. val records = consumer.poll(Duration.ofMillis(1)).asScala
  23. if(records.nonEmpty) {
  24. for (data <- records) {
  25. // do something with the records
  26. }
  27. consumer.commitAsync(new KeepOrderAsyncCommit)
  28. }
  29. }
  30. } catch {
  31. case ex: KafkaException => ex.printStackTrace()
  32. } finally {
  33. consumer.commitSync()
  34. consumer.close()
  35. }
  36. class KeepOrderAsyncCommit extends OffsetCommitCallback {
  37. // keeping position of this callback instance
  38. val position = atomicLong.incrementAndGet()
  39. override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
  40. // retrying only if no other commit incremented the global counter
  41. if(exception != null){
  42. if(position == atomicLong.get) {
  43. consumer.commitAsync(this)
  44. }
  45. }
  46. }
  47. }
  48. }
展开查看全部

相关问题