Kafka Producer发送消息时出现NOT_LEADER_FOR_PARTITION异常

fdbelqdn  于 2022-09-21  发布在  Kafka
关注(0)|答案(2)|浏览(1285)

我们使用Spring-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。每隔一段时间,一个生成器线程就会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试次数(当前设置为12,由依赖关系Spring-retry激活)。我们限制了重试,因为我们发送了大约1k msg/s(每个生产者示例),并且担心缓冲区的大小。这样我们就会经常丢失消息,这对下游消费者来说是不好的,因为我们不能简单地复制传入的流量。

错误消息为

[Producer clientId=producer-5] Received invalid metadata error in produce request on partition topic-21 due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.. Going to request metadata update now
[Producer clientId=producer-5] Got error produce response with correlation id 974706 on topic-partition topic-21, retrying (8 attempts left). Error: NOT_LEADER_FOR_PARTITION
[Producer clientId=producer-5] Got error produce response with correlation id 974707 on topic-partition topic-21, retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION

有什么已知的方法可以避免这种情况吗?我们是否应该恢复默认的max_int重试次数?为什么它一直发送到同一个代理,即使它的响应是NOT_LEADER_FOR_PARTITION?

欢迎任何提示。

编辑:我们刚刚注意到经纪人指标kafka_network_requestmetrics_responsequeuetimems在那个时间左右上升,但我们看到的最大值约为2.5s

kzipqqlq

kzipqqlq1#

生成和获取请求都被发送到分区的前导副本。NotLeaderForPartitionException当请求被发送到现在不是该分区的前导副本的分区时,抛出异常。

客户端将关于每个分区的首部的信息作为缓存来维护。缓存管理的完整流程如下所示。

客户端需要通过在生产者配置中设置metadata.max.age.ms来刷新此信息。此标记的默认值为300000毫秒

您可以阅读下面的Apache·Kafka文档。

https://kafka.apache.org/documentation/

请仔细阅读Sender.Java代码。

https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

您将在发件人代码中找到这两条错误消息。metadata.max.age.ms的默认值为3秒。我认为您应该减小此值,然后观察其行为。

6kkfgxo0

6kkfgxo02#

您需要适当的配置监听程序

我用的是码头--就像这样

services:
  zookeeper:
    container_name: zookeeper
    ports:
      - "2181:2181"
    ...
  broker-1:
    hostname: "broker-1.mydomain.com"
    ports:
      - "29091:29091"
    ...
  broker-2:
    hostname: "broker-2.mydomain.com"
    container_name: broker-2
    ports:
      - "29092:29092"

编辑每个代理的server.properties

经纪人-1

listeners: PRIVATE_HOSTNAME://broker-1.mydomain.com:9092,PUBLIC_HOSTNAME://broker-1.mydomain.com:29091
advertised.listeners: PRIVATE_HOSTNAME://broker-1.mydomain.com:9092,PUBLIC_HOSTNAME://broker-1.mydomain.com:29091
listener.security.protocol.map: PUBLIC_HOSTNAME:PLAINTEXT,PRIVATE_HOSTNAME:PLAINTEXT
inter.broker.listener.name: PRIVATE_HOSTNAME

经纪人-2

listeners: PRIVATE_HOSTNAME://broker-2.mydomain.com:9092, PUBLIC_HOSTNAME://broker-2.mydomain.com:29092
advertised.listeners: PRIVATE_HOSTNAME://broker-2.mydomain.com:9092, PUBLIC_HOSTNAME://broker-2.mydomain.com:29092
listener.security.protocol.map: PUBLIC_HOSTNAME:PLAINTEXT, PRIVATE_HOSTNAME:PLAINTEXT
inter.broker.listener.name: PRIVATE_HOSTNAME

重要提示:请注意,内网和公网使用相同的hostname,因为消费者/生产者只能通过如下注册名称访问Kafka:

[BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use broker broker-1.mydomain.com:9092
...
    [BrokerToControllerChannelManager broker=2 name=forwarding]: Recorded new controller, from now on will use broker broker-2.mydomain.com:9092

编辑您的主机/etc/hosts

127.0.0.1   broker-1.mydomain.com
127.0.0.1   broker-2.mydomain.com

相关问题