import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
1条答案
按热度按时间5vf7fwbs1#
您描述的场景只能在使用异步提交时发生。
请记住,一个特定的主题分区只能由同一个consumergroup中的单个使用者使用。如果您有两个消费者在阅读同一主题分区,那么这是唯一可能的
如果他们有不同的消费群体,或者
如果他们拥有相同的消费者群体,就会发生重新平衡。但是,一次只能有一个使用者读取该主题分区,而不会同时读取这两个分区。
案例1非常清楚:如果他们有不同的ConsumerGroup,那么他们并行地、独立地使用分区。此外,他们的补偿是分开管理的。
案例#2:如果第一个使用者由于失败/死亡且未恢复而未能提交偏移量10,则将发生使用者重新平衡,另一个活动使用者将拾取该分区。由于未提交偏移量10,新的使用者将再次开始读取偏移量10,然后跳到下一批并可能提交偏移量20。这导致“至少一次”语义,并可能导致重复。
现在,我们来看一个场景,在提交一个更高的偏移量之后,您可以提交一个更小的偏移量。如开头所说,如果异步提交偏移量(使用
commitAsync
). 设想以下场景,按时间排序:使用者读取偏移量0(后台线程尝试提交偏移量0)
提交偏移量0成功
使用者读取偏移量1(后台线程尝试提交偏移量1)
提交偏移量1失败,请稍后再试
使用者读取偏移量2(后台线程尝试提交偏移量2)
提交偏移量2成功
现在,该怎么办(你要提交偏移量1吗?)
如果让重试机制再次提交偏移量1,则看起来您的使用者只提交了偏移量1。这是因为最新偏移量par topicpartition上每个消费者组的信息都存储在内部压缩的kafka topic\u consumer\u offsets中,它只存储我们消费者组的最新值(在本例中:offset 1)。
在《Kafka-权威指南》一书中,有一个关于如何缓解这个问题的提示:
重试异步提交:为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到commitasync回调。准备发送重试时,检查回调得到的提交序列号是否等于示例变量;如果是,则没有新的提交,可以安全地重试。如果示例序列号较高,则不要重试,因为已发送较新的提交。
例如,您可以在下面的scala中看到这个想法的实现: