我已经成功地将kafka出站通道适配器与固定主题名集成。现在,我想使主题名可配置,因此,希望通过应用程序属性公开它。
application.properties包含以下条目之一:
kafkaTopic:testNewTopic
我的配置类如下所示:
@Configuration
@Component
public class KafkaConfig {
@Value("${kafkaTopic}")
private String kafkaTopicName;
@Bean
public String getTopic(){
return kafkaTopicName;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//this.brokerAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
}
在si-config.xml中,我使用了以下内容(例如:topic=“gettopic”):
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-template="kafkaTemplate"
auto-startup="true" sync="true" channel="inputToKafka" topic="getTopic">
</int-kafka:outbound-channel-adapter>
但是,当通过bean公开时,配置无法获取主题名称。但是当我硬编码主题名的值时,它工作得很好。
有人能告诉我我做错了什么吗?
做 topic
在kafka出站通道内接受被称为bean的值?
每个使用我的实用程序的应用程序都会提供不同的kafka主题名,因此如何将其外部化
1条答案
按热度按时间cwxwcias1#
这个
topic
属性表示字符串值。但是,它支持属性占位符解析:
以及上述bean的spel评估:
因为xml解析器配置允许这样做。
但是你要注意
KafkaTemplate
,注入<int-kafka:outbound-channel-adapter>
有defaultTopic
财产。因此,您不必担心xml。另外一个可用的选项是springintegrationannotations配置。您可以在其中定义
@ServiceActivator
对于KafkaProducerMessageHandler
@Bean
: