我在运行时检测Kafka分区变化时遇到了一个问题。我使用Spring Integration配置了Kafka,我无法弄清楚如何在应用程序运行期间检测分区变化的数量。
主要问题是Kafka主题有10个分区,我的Kafka配置如下(它有10个线程。分区和线程之间的关系是1比1)。当我增加Kafka主题分区数(假设20个分区),应用程序无法读取进入新创建的分区的消息,而不重新启动。
有没有办法配置spring集成来感知这种变化?
先谢了。
IntegrationFlow flow = IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory, topic)
.configureListenerContainer(c-> c.concurrency(10))))
.transform(transformer)
.get();
....
1条答案
按热度按时间23c0lvtd1#
此操作在每个
metadata.max.age.ms
Consumer属性中自动执行(默认为5分钟)