java 在 quarkus 中处理Kafka消息时出现异常

oxosxuxt  于 2023-02-07  发布在  Java
关注(0)|答案(1)|浏览(218)

在 quarkus 过程中,一旦从Kafka那里轮询到信息,我们就执行以下步骤

  1. Thread.sleep(30000)-由于业务逻辑
    1.调用第三方API
    1.调用另一个第三方API
    1.在数据库中插入数据
    几乎每天都会有一次,进程在抛出TooManyMessagesWithoutAck异常后挂起。
2022-12-02 20:02:50 INFO  [2bdf7fc8-e0ad-4bcb-87b8-c577eb506b38,     ] : Going to sleep for 30 sec.....
2022-12-02 20:03:20 WARN  [                    kafka] : SRMSG18231: The record 17632 from topic-partition '<partition>' has waited for 60 seconds to be acknowledged. This waiting time is greater than the configured threshold (60000 ms). At the moment 2 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 17631. This error is due to a potential issue in the application which does not acknowledged the records in a timely fashion. The connector cannot commit as a record processing has not completed.
2022-12-02 20:03:20 WARN  [                     kafka] : SRMSG18228: A failure has been reported for Kafka topics '[<topic name>]': io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit$TooManyMessagesWithoutAckException: The record 17632 from topic/partition '<partition>' has waited for 60 seconds to be acknowledged. At the moment 2 messages from this partition are awaiting acknowledgement. The last committed offset for this partition was 17631.
2022-12-02 20:03:20 INFO  [2bdf7fc8-e0ad-4bcb-87b8-c577eb506b38,     ] : Sleep over!

下面是我们如何使用消息的示例

@Incoming("my-channel")
@Blocking
CompletionStage<Void> consume(Message<Person> person) {
     String msgKey = (String) person
        .getMetadata(IncomingKafkaRecordMetadata.class).get()
        .getKey();
        // ...
      return person.ack();
}

根据日志,轮询事件后仅经过了30秒,但抛出了60秒未发送Kafka确认的异常。抛出错误时,我检查了一整天的日志,以查看REST API调用是否花费了30秒以上的时间来获取数据,但我无法找到任何异常。
除了主题名、通道名、序列化器、反序列化器、组id和托管kafka连接细节之外,我们没有做任何特定的Kafka配置。
本主题中有4个分区,复制因子为3。此进程有3个pod正在运行。我们无法在开发人员和UAT环境中重现此问题。
我检查了配置选项,但找不到任何配置可能会有帮助: quarkus 斯Kafka参考

mp:
  messaging:
    incoming:
      my-channel:
        topic: <topic>
        group:
          id: <group id>
        connector: smallrye-kafka
        value:
          serializer: org.apache.kafka.common.serialization.StringSerializer
          deserializer: org.apache.kafka.common.serialization.StringDeserializer

有没有可能 quarkus 正在成批地确认信息,到那时等待时间已经达到阈值了?如果这个问题有任何其他的可能性,请评论。

y4ekin9u

y4ekin9u1#

我也遇到过类似的问题,我们的生产环境运行不同的 quarkus 服务与一个简单的3节点Kafka集群,我研究了这个问题很多-没有明确的答案。目前,我有两种方法来解决这个问题:
1.确保你在代码中确认了kafka消息。是否每个异常都被捕获并用“person.nack(exception);“(或“person.ack(()”-取决于您的失败策略)?请确保它是。如果没有执行ack()或nack(),则会抛出错误Throttled-Exception。如果根本没有执行任何操作,则大多数情况下都会出现此问题。
1.当这没有帮助时,我将commit-strategy切换为“latest”:mp.messaging.incoming.my-channel.commit-strategy=latest这会稍微慢一点,因为批提交被禁用了,但是在我的例子中运行稳定。如果你不知道提交策略和默认值,可以参考article by Escoffier
我知道,这并不能解决根本原因,但在绝望的时候有所帮助。问题一定是,一个或多个排队的消息没有及时得到确认,但我不能告诉你为什么。也许是应用程序逻辑太慢,但是我很难--和你一样--在本地重现这一点。你也可以试着用X11 M1 N1 X增加60秒的阈值,自己看一看,如果这有帮助的话。在我的情况下,它没有。也许别人可以分享他对这个问题的见解,并能为你提供一个真实的的解决方案。

相关问题