在spring boot中配置kafka consumer=失去对消费者行为的控制?

v64noz0r  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(405)

我非常喜欢使用spring boot中提供的所有配置来配置kafka消费者:

ConsumerFactory() => KafkaListenerContainerFactory() => consume(V message)

然而,我似乎也失去了对apachekafka包附带的消费者行为的所有控制,比如在sync和async提交之间来回切换、显式启动消费者并完全关闭它。使用spring kafka接口,您只需实现一个方法,就可以开始接收消息:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeString(String message) {
    System.out.println("Consumed message: " + message);
}

在我看来,所有这些消耗都发生在SpringBoot自动提供的一个单独线程上。。。有人能告诉我,在使用spring boot配置kafkaconsumers(以及同样的kafkaproducers)时,如何仍然保持所有的控制吗?谢谢!

guz6ccqo

guz6ccqo1#

在我看来,所有这些消耗都发生在springboot自动提供的单独线程上
没错,spring将kafka消费者 Package 在listener容器中。
最多可以通过容器属性配置行为。
理论上,您可以保留kafka consumer factory返回的最新值,但是您需要记住,对于单个容器,可以多次创建一个consumer。不管怎样,只要保留引用,您就可以访问底层的kafka消费者而无需进行反思。但这是一种黑客行为,可能会影响容器的行为。

相关问题