import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
2条答案
按热度按时间oprakyz71#
我很好奇并测试了它的行为。正如文件中所写,@haoyuwang在他的回答中所写的是正确的(+1)。
背后的原因很简单。使用者组的提交偏移量存储在内部主题的kafka中
__consumer_offsets
. 这个主题是compact
这意味着它要为给定的键提供最新的值。在您的例子中,键是使用者组、主题和分区的组合,而您的值是偏移量。如果你现在
提交偏移量10,由于稍后的异步进程
提交偏移量5
偏移量5将是
__consumer_offsets
主题。这意味着消费者将从该主题分区读取的下一个偏移量是偏移量6,而不是偏移量11。如何繁殖
您可以通过(同步地)在常规提交之后提交一个较早的偏移量来复制并测试它,如下所示:
哪里
commitFirstMessage
定义为编辑:
如何避免使用commitasync提交较低的偏移量
在《Kafka-权威指南》一书中,有一条建议,即避免由于重试调用而提交较低的偏移量
commitAsync
:重试异步提交:为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到commitasync回调。准备发送重试时,检查回调得到的提交序列号是否等于示例变量;如果是,则没有新的提交,可以安全地重试。如果示例序列号较高,则不要重试,因为已发送较新的提交。
一个实现可能看起来像这样(实际上没有测试过!):
omhiaaxx2#
它只需将分区的偏移量设置为您指定的值,因此下次您将使用来自committedfost+1的消息。
的javadoc
commitAsync()
说:提交的偏移量应该是应用程序将使用的下一条消息,即lastprocessedmessageoffset+1。