我试图加载多个主题到一个@KafkaListener
,但遇到了麻烦,因为我相信它正在寻找一个常量值,但从application.yml
文件初始化topics
变量会导致一些问题,我想知道是否有人可以帮助我解决这个问题,或者为我提供如何将多个Kafka主题加载到一个Kafkatrix中的指导。
我可以在同一个@KafkaListener
中监听多个主题,方法是将它们传递到一个逗号分隔的对象中,如下所示:
@KafkaListener(topics = {
"flight-events",
"flight-time-events",
"service-events",
"flight-delay-events"
})
我意识到我可以用逗号分隔的值来表示主题,但我希望能够通过配置文件添加主题,而不是更改代码库中的代码。
我相信可能有一个问题,@ Kafkaiti需要接受一个常量值,而我无法将注解定义为常量,有什么方法可以解决这个问题吗?KafkaWebSocketConnector.java
@Component
public class KafkaWebSocketConnector
{
@Value("${spring.kafka.topics}")
private String[] topics;
@KafkaListener(topics = topics)
public void listen(ConsumerRecord<?, Map<String, String>> message)
{
log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
String dest = "/" + message.topic();
log.info("destination = {}", dest);
log.info("msg: {}", message);
messageTemplate.convertAndSend(dest, message.value());
}
}
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: kafka-websocket-connector
topics: flight-events,
flight-time-events,
canceled-events,
pax-events,
flight-delay-events
2条答案
按热度按时间pdsfdshx1#
来自@加里罗素的回答来自这个GitHub问题:
https://github.com/spring-projects/spring-kafka/issues/361
可以使用SpEL表达式;在EnableKafkaIntegrationTests中有一个例子。
在我的情况下
"#{'${spring.kafka.topics}'.split(',')}"
我能够实现上述代码,(由加里罗素提供),以回答上述问题。
nkkqxpd92#
谢谢@terrabl的回答。然而,有一个小错误。我们需要使用
.split(', ')
而不是.split(',')