Spring Kafka与Kafka集群

ubby3x7f  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(370)

我已经在集群中配置了3个kafka,我正在尝试使用springkafka。
但在我杀了Kafka的首领之后,我就不能再向队列发送其他信息了。
我将spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094“以及我的主机文件中的所有名称。
Kafka版本0.10
有人知道如何正确配置吗?
编辑
我测试了一件事,发生了一个奇怪的行为。启动服务时,我会向topic发送一条消息(以强制创建)
代码:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}

所以,这一次我没有启动kafka-1服务器(只有其他服务器),出现了一个例外:
org.springframework.kafka.core.kafkaproducerexception:发送失败;嵌套异常为org.apache.kafka.common.errors.timeoutexception:60000毫秒后更新元数据失败。
似乎SpringKafka只是试图连接第一个引导服务器。我使用的是SpringKafka1.3.5.release和Kafka0.10.1.1
编辑2
我做了你做的测试。当我移除第一个docker容器(kafka-1)时也会发生同样的情况,因为领导者已经改变了。因此,我的消费者(spring服务)无法使用这些消息。但是,当我再次启动kafka-1时,服务会收到我的消费者concurrentkafkalistenercontainerfactory的所有消息:

{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}
0lvr5msh

0lvr5msh1#

服务器地址之间需要逗号,而不是分号。
编辑
我只是做了个测试,没有问题:

spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

@SpringBootApplication
public class So50804678Application {

    public static void main(String[] args) {
        SpringApplication.run(So50804678Application.class, args);
    }

    @KafkaListener(id = "foo", topics = "so50804678")
    public void in(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50804678", 1, (short) 3);
    }

}

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

杀了首领,然后

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2

$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678

发送消息并被应用程序接收;日志中没有错误,只有警告:
无法建立到节点0的[consumer clientid=consumer-1,groupid=foo]连接。代理可能不可用。
然后我重启了死机服务器;停止我的应用程序;然后添加了这个代码。。。

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
    return args -> {
        while(true) {
            System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
            Thread.sleep(3_000);
        }
    };
}

再次,杀死现任领导人没有任何影响;一切恢复正常。
您可能需要调整服务器道具中的listeners/adverted.listeners属性。因为我的经纪人都在本地主机上,所以我让他们违约。

相关问题