akka 为什么启用了自动提交的Kafka客户端在使用者关闭期间提交最新生成的消息的偏移量,即使该消息还没有被使用?

new9mtju  于 2022-11-05  发布在  Kafka
关注(0)|答案(1)|浏览(126)

顶级域名:

  • 对于启用了自动提交的Kafka客户端,提交生成消息的偏移量是否为已使用(即使不是)的预期行为?(对于使用和生成相同主题的应用程序)
    详细说明:

我有一个简单的scala应用程序,其中有一个Akka actor,它使用来自Kafka主题的消息,如果在消息处理过程中发生任何异常,则向同一个主题生成消息。
TestActor.scala

override protected def processMessage(messages: Seq[ConsumerRecord[String, String]]): Future[Done] = {
    Future.sequence(messages.map(message => {
      logger.info(s"--CONSUMED: offset: ${message.offset()} message: ${message.value()}")
      // in actual implementation, some process is done here and if an exception occurs, the message is sent to the same topic as seen below
      sendToExceptionTopic(Instant.now().toEpochMilli)
      Thread.sleep(1000)
      Future(Done)
    })).transformWith(_ => Future(Done))
  }

这个演员每分钟开始一次,跑20秒然后停下来。
Starter.scala

def init(): Unit = {
    exceptionManagerActor ! InitExceptionActors

    system.scheduler.schedule(2.second, 60.seconds) {
      logger.info("started consuming messages")
      exceptionManagerActor ! ConsumeExceptions
    }
  }

ExceptionManagerActor.scala

private def startScheduledActor(actorRef: ActorRef): Unit = {
    actorRef ! Start

    context.system.scheduler.scheduleOnce(20.seconds) {
      logger.info("stopping consuming messages")
      actorRef ! Stop
    }
  }

BaseActorWithAutoCommit.scala

override def receive: Receive = {
    case Start =>
      consumerBase = consumer
        .groupedWithin(20, 2000.millisecond)
        .mapAsyncUnordered(10)(processMessage)
        .toMat(Sink.seq)(DrainingControl.apply)
        .run()

    case Stop =>
      consumerBase.drainAndShutdown().transformWith {
        case Success(value) =>
          logger.info("actor stopped")
          Future(value)
        case Failure(ex) =>
          logger.error("error: ", ex)
          Future.failed(ex)
      }
    //Await.result(consumerBase.drainAndShutdown(), 1.minute)
  }

使用此配置,在停止时,Kafka客户端将提交最新生成的消息的偏移量,就像它已被使用一样。
示例日志:

14:28:48.868 INFO - started consuming messages
14:28:50.945 INFO - --CONSUMED: offset: 97 message: 1
14:28:51.028 INFO - ----PRODUCED: offset: 98 message: 1643542130945
...
14:29:08.886 INFO - stopping consuming messages
14:29:08.891 INFO - --CONSUMED: offset: 106 message: 1643542147106
14:29:08.895 INFO - ----PRODUCED: offset: 107 message: 1643542148891 <------ this message was lost
14:29:39.946 INFO - actor stopped
14:29:39.956 INFO - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://test-consumer/system/Materializers/StreamSupervisor-2/$$a#1541548736] to Actor[akka://test-consumer/system/kafka-consumer-1#914599016] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://test-consumer/system/kafka-consumer-1#914599016] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14:29:48.866 INFO - started consuming messages <----- The message with offset 107 was expected in this cycle to consume but it was not consumed
14:30:08.871 INFO - stopping consuming messages
14:30:38.896 INFO - actor stopped

正如您从日志中看到的,生成了偏移量为107的消息,但在下一个周期中未使用该消息。

  • 实际上,我不是 akka 演员的Maven,也不知道这种情况是来自Kafka还是 akka ,但我觉得这似乎与自动提交有关。*

使用的依赖项版本:

lazy val versions = new {
  val akka = "2.6.13"
  val akkaHttp = "10.1.9"
  val alpAkka = "2.0.7"
  val logback = "1.2.3"
  val apacheCommons = "1.7"
  val json4s = "3.6.7"
}

libraryDependencies ++= {
  Seq(
    "com.typesafe.akka" %% "akka-slf4j" % versions.akka,
    "com.typesafe.akka" %% "akka-stream-kafka" % versions.alpAkka,
    "com.typesafe.akka" %% "akka-http" % versions.akkaHttp,
    "com.typesafe.akka" %% "akka-protobuf" % versions.akka,
    "com.typesafe.akka" %% "akka-stream" % versions.akka,
    "ch.qos.logback" % "logback-classic" % versions.logback,
    "org.json4s" %% "json4s-jackson" % versions.json4s,
    "org.apache.commons" % "commons-text" % versions.apacheCommons,
  )
}

一个示例源代码和步骤,以重现这种情况可以从this repository

gupuwyp2

gupuwyp21#

就Kafka而言,阿尔帕卡·Kafka一读到Kafka的信息,信息就被消费掉了。
这是在Alpakka Kafka内部的参与者将其发出到下游使用者进行应用程序级处理之前。
因此,Kafka自动提交(enable.auto.commit = true)将导致在消息发送到参与者之前提交偏移量。
Kafka关于偏移量管理的文档确实(在本文写作时)提到enable.auto.commit具有至少一次语义,但正如我在第一段中提到的,这是至少一次 * 交付 * 语义,而不是至少一次 * 处理 * 语义。后者是应用程序级别的问题,要实现这一点需要延迟偏移量提交,直到处理完成。
Alpakka Kafka的文档有an involved discussion about at-least-once processing:在这种情况下,至少一次处理可能需要引入手动偏移量提交并将mapAsyncUnordered替换为mapAsync(因为mapAsyncUnordered与手动偏移量提交结合使用意味着您的应用程序只能保证来自Kafka的消息至少被处理零次)。
在Alpakka Kafka中,信息处理的广泛分类保证了:

  • 最多硬一次:Consumer.atMostOnceSource-在处理每条消息之前提交
  • 最多软一次:enable.auto.commit = true-“soft”,因为提交实际上是批处理的,以提高吞吐量,所以这实际上是“最多一次,除非它是至少一次”
  • 硬至少一次:仅在验证所有处理均成功后手动提交
  • 至少软一次:在某些处理完成后手动提交(即“至少一次,除非是最多一次”)
  • 恰好一次:一般情况下不可能,但如果您的处理具有重复数据消除功能,从而使重复项具有幂等性,则可以有效地使用一次

相关问题