override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
// Create KafkaRDD and other irrelevant code
currentOffsets = untilOffsets
commitAll()
Some(rdd)
}
哪里 commitAll 只需轮询正在被填满的队列 commitAsync :
protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
var osr = commitQueue.poll()
while (null != osr) {
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
m.put(tp, new OffsetAndMetadata(offset))
osr = commitQueue.poll()
}
if (!m.isEmpty) {
consumer.commitAsync(m, commitCallback.get)
}
}
1条答案
按热度按时间xzv2uavs1#
在当前批处理结束时是否有提交偏移量的解决方法
不是通过
commitAsync
应用程序编程接口。方法调用所做的是将要提交的偏移量排队,然后在DirectKafkaInputDStream.compute
异步提交:哪里
commitAll
只需轮询正在被填满的队列commitAsync
:因此,不幸的是,如果要将偏移作为事务提交,就必须将它们单独存储在自己的存储中,而不是使用kafka内置的偏移提交跟踪。