SpringKafka河创建多个流

mlnl4t2r  于 2021-07-23  发布在  Java
关注(0)|答案(0)|浏览(223)

我可以用下一种方法创建像 Spring Bean 一样的Kafka流:

@Bean
fun kStream(kStreamBuilder: StreamsBuilder): KStream<String, String> {
    val stream = kStreamBuilder
            .stream<String, String>("topic1")
    stream.map(mapValuesAndGetAnswer())
            .to("topic2")
    return stream

但是现在我需要为几个主题创建它。例如,我有一个包含in-topic和out-topic的数组,比如:intopic[“topic1”,“topic2”,“topic3”],outtopic[“outtopic1',“outtopic1',“outtopic3']。这些主题的数量和它们的名称需要从application.yaml中获取,所以我尝试用autowirecapablebeanfactory动态地创建这些流,但它不起作用

@PostConstruct
fun createKafkaBeans() {
    val beanFactory = applicationContext.autowireCapableBeanFactory
    for (i in kafkaStreamsProps.inboundTopic.indices) {
        val inTopic = kafkaStreamsProps.inboundTopic[i]
        val outTopic = kafkaStreamsProps.outboundTopic[i]
        val kStreamBuilder = StreamsBuilder();
        val stream = kStreamBuilder
                .stream<String, String>(inTopic)
        stream.map(mapValuesAndGetAnswer()).to(outTopic)
        val initializedBean = beanFactory.initializeBean(stream, "streamTopic$i")
        beanFactory.autowireBean(initializedBean)
    }
}

那么有没有办法实现这个逻辑呢?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题