我可以用下一种方法创建像 Spring Bean 一样的Kafka流:
@Bean
fun kStream(kStreamBuilder: StreamsBuilder): KStream<String, String> {
val stream = kStreamBuilder
.stream<String, String>("topic1")
stream.map(mapValuesAndGetAnswer())
.to("topic2")
return stream
但是现在我需要为几个主题创建它。例如,我有一个包含in-topic和out-topic的数组,比如:intopic[“topic1”,“topic2”,“topic3”],outtopic[“outtopic1',“outtopic1',“outtopic3']。这些主题的数量和它们的名称需要从application.yaml中获取,所以我尝试用autowirecapablebeanfactory动态地创建这些流,但它不起作用
@PostConstruct
fun createKafkaBeans() {
val beanFactory = applicationContext.autowireCapableBeanFactory
for (i in kafkaStreamsProps.inboundTopic.indices) {
val inTopic = kafkaStreamsProps.inboundTopic[i]
val outTopic = kafkaStreamsProps.outboundTopic[i]
val kStreamBuilder = StreamsBuilder();
val stream = kStreamBuilder
.stream<String, String>(inTopic)
stream.map(mapValuesAndGetAnswer()).to(outTopic)
val initializedBean = beanFactory.initializeBean(stream, "streamTopic$i")
beanFactory.autowireBean(initializedBean)
}
}
那么有没有办法实现这个逻辑呢?
暂无答案!
目前还没有任何答案,快来回答吧!