我们使用Spring-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。每隔一段时间,一个生成器线程就会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试次数(当前设置为12,由依赖关系Spring-retry激活)。我们限制了重试,因为我们发送了大约1k msg/s(每个生产者示例),并且担心缓冲区的大小。这样我们就会经常丢失消息,这对下游消费者来说是不好的,因为我们不能简单地复制传入的流量。
错误消息为
[Producer clientId=producer-5] Received invalid metadata error in produce request on partition topic-21 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now
[Producer clientId=producer-5] Got error produce response with correlation id 974706 on topic-partition topic-21, retrying (8 attempts left). Error: NOT_LEADER_FOR_PARTITION
[Producer clientId=producer-5] Got error produce response with correlation id 974707 on topic-partition topic-21, retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION
有什么已知的方法可以避免这种情况吗?我们是否应该恢复默认的max_int重试次数?为什么它一直发送到同一个代理,即使它的响应是NOT_LEADER_FOR_PARTITION?
欢迎任何提示。
编辑:我们刚刚注意到经纪人指标kafka_network_requestmetrics_responsequeuetimems在那个时间左右上升,但我们看到的最大值约为2.5s
2条答案
按热度按时间kzipqqlq1#
生成和获取请求都被发送到分区的前导副本。NotLeaderForPartitionException当请求被发送到现在不是该分区的前导副本的分区时,抛出异常。
客户端将关于每个分区的首部的信息作为缓存来维护。缓存管理的完整流程如下所示。
客户端需要通过在生产者配置中设置
metadata.max.age.ms
来刷新此信息。此标记的默认值为300000毫秒您可以阅读下面的Apache·Kafka文档。
https://kafka.apache.org/documentation/
请仔细阅读Sender.Java代码。
https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
您将在发件人代码中找到这两条错误消息。
metadata.max.age.ms
的默认值为3秒。我认为您应该减小此值,然后观察其行为。6kkfgxo02#
您需要适当的配置监听程序
我用的是码头--就像这样
编辑每个代理的server.properties
经纪人-1
经纪人-2
重要提示:请注意,内网和公网使用相同的
hostname
,因为消费者/生产者只能通过如下注册名称访问Kafka:编辑您的主机/etc/hosts