让我描述一下我的问题背后的基本原理:
我们有一个基于micronaut的应用程序,它使用来自kafka代理的消息。
所消耗的消息被处理并馈送到另一个远程“下游”应用程序。
如果这个下游应用程序要有目的地重新启动,则需要一段时间来准备接受来自基于micronaut的应用程序的进一步消息。
因此,我们可以向micronaut应用程序发送一个请求,以暂停/暂停来自kafka的消息的使用(例如,通过http到适当的端点)。kafkaconsumer接口似乎有适当的方法来实现这个目标,比如
public void pause(java.util.Collection<TopicPartition> partitions)
public void resume(java.util.Collection<TopicPartition> partitions)
但是,如何获得对输入到http端点的适当kafkaconsumer示例的引用呢?
我们尝试将其注入到http端点/控制器类的构造函数中,但这会产生
Error instantiating bean of type [HttpController]
Message: Missing bean arguments for type: org.apache.kafka.clients.consumer.KafkaConsumer. Requires arguments: AbstractKafkaConsumerConfiguration consumerConfiguration
可以使用micronaut kafka文档中描述的@topic annotated receive methods获取kafkaconsumer示例的引用作为方法参数,但这会导致将该引用存储为示例变量,并通过http端点访问它,等等。。。这听起来不是很有说服力:只有在收到下一条消息时,你才会得到一个关于Kafka消费者的引用!这可能适用于挂起/暂停,但不适用于恢复!
顺便说一下,对保存为示例变量的引用调用kafkaconsumer.resume(…)会产生
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2201)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2185)
at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1842)
[...]
我认为在实现kafkaconsumeraware接口来存储新创建的kafkaconsumer示例的引用时也是如此。
那么,有没有什么办法可以恰当地处理这个问题呢?
谢谢
基督教的
暂无答案!
目前还没有任何答案,快来回答吧!