akka stream kafka在消费者组中增加消费者触发重新平衡,因为撤销的分区导致commitfailedexception

aiazj4mn  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(273)

我认为这个问题与#539有关,但我不知道这是一个bug,还是应该由用户自己处理。
因此,我有一个消费者组,每当我增加该组中消费者的数量时,取消分区会导致以下错误:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:778)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:617)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:584)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1479)
    at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$commit(KafkaConsumerActor.scala:430)
    at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:210)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.Timers.aroundReceive(Timers.scala:55)
    at akka.actor.Timers.aroundReceive$(Timers.scala:40)
    at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

当我缩小消费者数量时,这种情况不会发生。我是说到目前为止我还没有注意到这一点。我假设这是因为,分区不是按比例缩小的。剩下的消费者只需要获得新的分区。
注意,我对消息进行分组并提交批处理。
下面是我的代码的样子

val source = Consumer.committableSource(consumerSettings, subscription)
      .async
      .groupBy(Int.MaxValue, computeNamedGraph)
      .groupedWithin(conf.tripleStoreSettings.batchSize, conf.tripleStoreSettings.batchWindowSec seconds)
      .map(toUpdateStatements)
      .async
      .mergeSubstreams
      .map(toHttpRequest)
      .map(p => p.data -> p)
      .via(poolClientFlow)
      .async
      .map { case (respone, payload) => Payload(respone, payload.offsets) }
      .mapConcat(handleResponse)
      .via(Committer.flow(committerDefaults.withMaxBatch(conf.tripleStoreSettings.batchSize)))

    val (killSwitch, streamResults) = source
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.both)
      .run()

    streamResults.onComplete {
      case Success(_) =>
        logger.info("Stream finished")
        system.terminate()
      case Failure(e) =>
        logger.error("Stream failed:", e)
        system.terminate()
    }

我的决策者只做了以下几点:

private val decider: Supervision.Decider = {
    e => {
      logger.error(s"Stream failed. ${e.getMessage} ${e.getStackTrace.map(_.toString).mkString("\n")}", e)
      Supervision.Stop
    }
  }

因此,根据我对#539的阅读,我了解到我有很多飞行中的信息需要回复,但由于重新发送,我无法回复。也就是说,当消费者数量增加时,会出现一些再平衡,包括重新销售。
我的服务至少有一次,所以我不介意其他消费者重新处理这些消息。我们没有最多一个交付约束。
我的问题是,在库以本机方式处理这些情况之前,我如何在发生吊销时提交它们,或者更好的是,丢弃它们,这样分配给它们所属的分区的使用者也将重新处理它们。
有什么建议吗?我检查了balancelistener,但我不知道如何在这种情况下使用它。
注意我的超时配置

val subscription = Subscriptions.topicPattern(conf.kafkaConsumer.sourceTopic)
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(conf.kafkaBroker.bootstrapServers)
      .withGroupId(conf.kafkaConsumer.groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, conf.kafkaConsumer.offsetReset)
      .withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000000")
      .withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100000")

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题