引导代理已断开连接:无法连接到Kafka

6l7fqoea  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(129)

我在Confluent Cloud上创建了一个Kafka集群,但无法连接到它。当我运行producer时,我得到以下错误:
[Producer clientId=producer-1]节点-1已断开连接。2023-06-05T06:09:20.826+05:30 INFO 25324 --- [ad| producer-1] org.apache.Kafka.clients.NetworkClient:[Producer clientId=producer-1]由于节点-1断开连接,已取消相关ID为189的正在进行的API_VERSIONS请求(创建后经过的时间:253 ms,自发送后经过的时间:253 ms,请求超时:30000ms)2023-06-05T06:09:20.827+05:30 WARN 25324 --- [ad| producer-1] org.apache.Kafka.clients.NetworkClient:[Producer clientId=producer-1] Bootstrap broker(id:机架:空)断开
我尝试创建一个新的集群,但结果是一样的。我正在使用Sping Boot 连接到群集。
下面是配置:
spring.Kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.bootstrap.servers=broker-address-here:9092 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='api-key-here' password='api-secret-here'; spring.Kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.session.timeout.ms =45000
Sping Boot beans:

@Configuration
public class KafkaConfiguration {
  
    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootStrapServer;

    @Bean
    public ProducerFactory<String, String> producerFactory() {

      return new DefaultKafkaProducerFactory<String, String>(Map.of(
              AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer,
              AdminClientConfig.RETRIES_CONFIG, 0,
              ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
              ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
              ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
      ));
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
    }
}

下面是我的控制器,它试图向集群发送消息:

@RestController
@RequestMapping("/produce")
public final class LogProducer {

    @Autowired 
    private KafkaTemplate<String, String> kafkaTemplate;
 
    private static final String TOPIC = "logs";
 
    // Publish messages using the GetMapping
    @PostMapping("/logs/v1")
    public String publishMessage()
    {
 
        // Sending the message
        kafkaTemplate.send(TOPIC, "sample log message");
 
        return "Published Successfully";
    }    
}

有什么建议吗,我做错了什么?

qf9go6mv

qf9go6mv1#

我找到问题所在了。
问题是spring Boot 拒绝从application.properties中获取此信息。我需要将所有信息注入Producerfactory bean中,只有这样它才能工作。我还尝试完全删除Producerfactorybean,认为可能会有冲突,但删除Producerfactorybean也没有帮助。
下面是我为Producerfactory更新的代码

@Bean
public ProducerFactory<String, String> producerFactory() {

  //Defining these propeties in application.propeties only does not work. Need to create a Producerfactory bean and inject all these properties. Only then it works.
  return new DefaultKafkaProducerFactory<String, String>(Map.of(
          AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer,
          AdminClientConfig.RETRIES_CONFIG, 0,
          ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol,
          SaslConfigs.SASL_MECHANISM, saslMechanism,
          SaslConfigs.SASL_JAAS_CONFIG, saslJAAS
  ));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

我不知道春 Boot 为什么要这么做。

相关问题