kafka消费者承诺失败异常

7y4bm7vi  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(818)

我在做Kafka消费计划。最近我们在prod环境中部署了它。在那里,我们面临以下问题:

[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch start offset: 9329428
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch Processing Successful.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Failing OffsetCommit request since the consumer is not part of an active group
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)
    at com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)

我的理解是,当组协调器不可用并被重新发现时,心跳间隔(根据文档为3秒)过期,消费者被踢出组。是这样吗?。如果是这样的话,应该做些什么呢?。如果我错了,请帮助我理解这个问题,并提出任何想法,你必须解决这个问题。如果需要,我可以共享代码。

eanckbw9

eanckbw91#

我们也遇到了类似的问题,我们通过将max.poll.records从默认值500减少到了500,并减少了心跳间隔来解决。如果消息处理需要时间,并且轮询记录为500,则获得commitfailedexception的可能性很高。

bkhjykvo

bkhjykvo2#

你所指的例外

Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

提示正在发生的事情以及可以采取什么措施来解决问题。在代码中,此异常描述为
“当kafkaconsumer#commitsync()的偏移量提交失败并出现不可恢复的错误时,将引发此异常。在成功应用提交之前完成组重新平衡时,可能会发生这种情况。在这种情况下,通常无法重试提交,因为某些分区可能已分配给组中的另一个成员。“
根据我的经验,抛出的错误消息可能是由不同的原因引起的,尽管它们都与不再分配给分区的使用者有关:
创造越来越多的消费者而不关闭他们
轮询超时
心跳超时
过期的kerberos票证

1. 打开越来越多的消费者而不关闭他们

如果将消费者添加到现有的消费者组,则会发生重新平衡。因此,必须在使用后关闭使用者,或者始终使用相同的示例,而不是为每个消息/迭代创建新的kafkaconsumer对象。

2. 轮询超时(如错误消息中所述):

[…]对poll()的后续调用之间的间隔时间比配置的 max.poll.interval.ms ,这通常意味着轮询循环花费了太多的时间来处理消息。
配置max.poll.interval.ms默认为 300000ms 或者 5minutes . 由于您的使用者占用的时间超过这5分钟,因此该使用者被视为失败,组将重新平衡,以便将分区重新分配给另一个成员(请参阅使用者配置)。

轮询超时解决方案:

错误消息中也给出了可能的解决方案
你可以通过增加 max.poll.interval.ms 或者通过减少poll()中返回的批的最大大小 max.poll.records .
使用者再次读取所有消息,因为(如错误所示)它无法提交偏移量。也就是说,如果你用同样的方法 group.id 它认为它从未读过这个主题的任何东西。

3. 心跳超时

Kafka消费者中有两种主要配置用于处理心跳: heartbeat.interval.ms 以及 session.timeout.ms .
在一个单独的后台线程中,kafkaconsumer定期向服务器发送心跳信号。如果使用者崩溃或在session.timeout.ms的持续时间内无法发送心跳,则使用者将被视为已死亡,并将重新分配其分区。如果触发了重新平衡,您的使用者将无法提交“旧分配”分区中的任何内容,正如commitfailedexception的描述中所述:“当组重新平衡在提交成功应用之前完成时,可能会发生这种情况。”

心跳超时解决方案:

增加设置 heartbeat.interval.ms 以及 session.timeout.ms 在遵循建议的同时: heartbeat.interval.ms 必须设置为低于 session.timeout.ms ,但通常应设置为不高于该值的1/3。”
请记住,改变这些值总是伴随着一种权衡。你也有
更频繁的再平衡,但更短的React时间,以确定死亡的消费者或
减少再平衡的频率和更长的React时间来确定死亡的消费者。

4. 过期的kerberos票证

在我们的生产集群上,我们看到commitfailedexception就在应用程序无法续订kerberos票证之后。

相关问题