spark:在批处理结束时提交kafka偏移量

guicsvcw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(419)

版本:
Spark2.2
Kafka0.11
根据Kafka中提交补偿的文档,我应该使用: stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) 因此,偏移量仅在下一批开始时提交。这会导致“持续”滞后。
在当前批处理结束时提交偏移量是否有任何解决方法(但仍然对偏移量使用相同的kafka组)?
滞后监测示例:

xzv2uavs

xzv2uavs1#

在当前批处理结束时是否有提交偏移量的解决方法
不是通过 commitAsync 应用程序编程接口。方法调用所做的是将要提交的偏移量排队,然后在 DirectKafkaInputDStream.compute 异步提交:

override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
  val untilOffsets = clamp(latestOffsets())

  // Create KafkaRDD and other irrelevant code

  currentOffsets = untilOffsets
  commitAll()
  Some(rdd)
}

哪里 commitAll 只需轮询正在被填满的队列 commitAsync :

protected def commitAll(): Unit = {
  val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
  var osr = commitQueue.poll()
  while (null != osr) {
    val tp = osr.topicPartition
    val x = m.get(tp)
    val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
    m.put(tp, new OffsetAndMetadata(offset))
    osr = commitQueue.poll()
  }
  if (!m.isEmpty) {
    consumer.commitAsync(m, commitCallback.get)
  }
}

因此,不幸的是,如果要将偏移作为事务提交,就必须将它们单独存储在自己的存储中,而不是使用kafka内置的偏移提交跟踪。

相关问题