我有一个与Kafka一起探索Spring的示例项目(在这里找到)。我有一个监听器订阅了主题我的测试主题上游,这将只是lo的消息和键,并发布相同的另一个主题我的测试主题下游。我试过这是本地Kafka(docker撰写文件在那里)和它的作品。
现在我试着用一个嵌入式Kafka服务器来编写一个测试。在测试中,我有一个嵌入式服务器正在启动(testcontext.java),它应该在测试之前启动(在测试之前重写junit)。
private static EmbeddedKafkaBroker kafka() {
EmbeddedKafkaBroker kafkaEmbedded =
new EmbeddedKafkaBroker(
3,
false,
1,
"my-test-topic-upstream", "my-test-topic-downstream");
Map<String, String> brokerProperties = new HashMap<>();
brokerProperties.put("default.replication.factor", "1");
brokerProperties.put("offsets.topic.replication.factor", "1");
brokerProperties.put("group.initial.rebalance.delay.ms", "3000");
kafkaEmbedded.brokerProperties(brokerProperties);
try {
kafkaEmbedded.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
return kafkaEmbedded;
}
然后我创建一个生产者(tickproducer)并发布一条消息到我希望我的听众能够使用的主题。
public TickProducer(String brokers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
public RecordMetadata publishTick(String brand)
throws ExecutionException, InterruptedException {
return publish(TOPIC, brand, Instant.now().toString());
}
private RecordMetadata publish(String topic, String key, String value)
throws ExecutionException, InterruptedException {
final RecordMetadata recordMetadata;
recordMetadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
producer.flush();
return recordMetadata;
}
我看到下面的日志消息继续记录。
11:32:35.745 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Connection to node -1 could not be established. Broker may not be available.
最终以失败告终
11:36:52.774 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
有什么建议吗?
1条答案
按热度按时间bpzcxfmw1#
查看信息日志
ConsumerConfig
看看他想联系什么(把它和ProducerConfig
). 我怀疑你没有更新Spring Bootbootstrap-servers
属性指向嵌入的代理。看到了吗
设置为
spring.kafka.bootstrap-servers
然后用它来代替SPRING_EMBEDDED_KAFKA_BROKERS
.顺便说一句,使用
@EmbeddedKafka
而不是自己示例化服务器。