kafka leader不可用,即使主题有leader

2wnc66cl  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(292)

我们有3个kafka(1.0.0)节点,一个主题有4个分区和3个副本。主题通常如下所示:

Topic:MissionControlTopic   PartitionCount:4    ReplicationFactor:3 Configs:
Topic: MissionControlTopic  Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 2,1,0
Topic: MissionControlTopic  Partition: 1    Leader: 1   Replicas: 1,2,0 Isr: 2,1,0
Topic: MissionControlTopic  Partition: 2    Leader: 2   Replicas: 2,0,1 Isr: 2,1,0
Topic: MissionControlTopic  Partition: 3    Leader: 0   Replicas: 0,2,1 Isr: 2,1,0

每隔一段时间,节点0就会停止响应(这是一个问题,但不是问题所在)。当发生这种情况时,其他两个节点将正确地接管其分区,主题如下所示:

Topic:MissionControlTopic   PartitionCount:4    ReplicationFactor:3 Configs:
Topic: MissionControlTopic  Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 2,1
Topic: MissionControlTopic  Partition: 1    Leader: 1   Replicas: 1,2,0 Isr: 2,1
Topic: MissionControlTopic  Partition: 2    Leader: 2   Replicas: 2,0,1 Isr: 2,1
Topic: MissionControlTopic  Partition: 3    Leader: 2   Replicas: 0,2,1 Isr: 2,1

在这一点上,大多数(但不是所有)生产者和消费者不能写/读Kafka和保持日志记录 LEADER_NOT_AVAILABLE 例外情况(第一期)。一旦节点0被还原并且领导者被重新平衡,应用程序仍然会记录异常(第二个问题)。只有在应用程序重新启动后,它们才能重新连接并开始正常工作。正如您所想象的,每当kafka节点出现问题时,重新启动所有应用程序是非常不切实际的。
我不确定这里有什么有用的信息来解决这个问题。我们在互联网上搜寻信息,但没有发现任何迹象表明我们的配置有任何明显的错误。我甚至在本地重现了这个问题,一旦节点恢复,应用程序就会正确地重新连接。
这是写给Kafka的代码:

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.getCanonicalName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericEventSerializer.getCanonicalName());

kafkaProducer = new KafkaProducer<>(properties);

// And at some later point...
kafkaProducer.send(new ProducerRecord<>(TOPIC, event), (metadata, exception) -> {
    if (exception != null)
    {
        LOGGER.error("Failed to write to Kafka", exception);
    }
});

这是从中读取的代码:

Properties props = new Properties();
props.put("enable.auto.commit", false);
props.put("bootstrap.servers", kafkaHostString);
props.put("group.id", consumerGroupId);
props.put("request.timeout.ms", 15000);
props.put("session.timeout.ms", 10000);
props.put("max.poll.records", 10000);
props.put("batch.size", 6400000);

Consumer<String, GenericEvent> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new GenericEventDeserializer());
consumer.subscribe(Collections.singleton(topic));

// And at some later point ...
records = consumer.poll(pollTimeout);
consumer.commitSync();
``` `advertised.host.name` ,  `advertised.port` ,和 `advertised.listeners` 都准备好了 `server.properties` .

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题