我非常喜欢使用spring boot中提供的所有配置来配置kafka消费者:
ConsumerFactory() => KafkaListenerContainerFactory() => consume(V message)
然而,我似乎也失去了对apachekafka包附带的消费者行为的所有控制,比如在sync和async提交之间来回切换、显式启动消费者并完全关闭它。使用spring kafka接口,您只需实现一个方法,就可以开始接收消息:
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeString(String message) {
System.out.println("Consumed message: " + message);
}
在我看来,所有这些消耗都发生在SpringBoot自动提供的一个单独线程上。。。有人能告诉我,在使用spring boot配置kafkaconsumers(以及同样的kafkaproducers)时,如何仍然保持所有的控制吗?谢谢!
1条答案
按热度按时间guz6ccqo1#
在我看来,所有这些消耗都发生在springboot自动提供的单独线程上
没错,spring将kafka消费者 Package 在listener容器中。
最多可以通过容器属性配置行为。
理论上,您可以保留kafka consumer factory返回的最新值,但是您需要记住,对于单个容器,可以多次创建一个consumer。不管怎样,只要保留引用,您就可以访问底层的kafka消费者而无需进行反思。但这是一种黑客行为,可能会影响容器的行为。