我在Confluent Cloud上创建了一个Kafka集群,但无法连接到它。当我运行producer时,我得到以下错误:
[Producer clientId=producer-1]节点-1已断开连接。2023-06-05T06:09:20.826+05:30 INFO 25324 --- [ad| producer-1] org.apache.Kafka.clients.NetworkClient:[Producer clientId=producer-1]由于节点-1断开连接,已取消相关ID为189的正在进行的API_VERSIONS请求(创建后经过的时间:253 ms,自发送后经过的时间:253 ms,请求超时:30000ms)2023-06-05T06:09:20.827+05:30 WARN 25324 --- [ad| producer-1] org.apache.Kafka.clients.NetworkClient:[Producer clientId=producer-1] Bootstrap broker(id:机架:空)断开
我尝试创建一个新的集群,但结果是一样的。我正在使用Sping Boot 连接到群集。
下面是配置:
spring.Kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.bootstrap.servers=broker-address-here:9092 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='api-key-here' password='api-secret-here'; spring.Kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.session.timeout.ms =45000
Sping Boot beans:
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootStrapServer;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String, String>(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer,
AdminClientConfig.RETRIES_CONFIG, 0,
ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
下面是我的控制器,它试图向集群发送消息:
@RestController
@RequestMapping("/produce")
public final class LogProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "logs";
// Publish messages using the GetMapping
@PostMapping("/logs/v1")
public String publishMessage()
{
// Sending the message
kafkaTemplate.send(TOPIC, "sample log message");
return "Published Successfully";
}
}
有什么建议吗,我做错了什么?
1条答案
按热度按时间qf9go6mv1#
我找到问题所在了。
问题是spring Boot 拒绝从application.properties中获取此信息。我需要将所有信息注入Producerfactory bean中,只有这样它才能工作。我还尝试完全删除Producerfactorybean,认为可能会有冲突,但删除Producerfactorybean也没有帮助。
下面是我为Producerfactory更新的代码
我不知道春 Boot 为什么要这么做。