我有一个与Kafka Spring 启动应用程序。
我想接收来自kafka主题的消息(主题是动态创建的,所以我在不同的线程中使用topicpattern=“my tompic-.*),这样,如果来自主题的消息my topic-正在处理很长时间-不要等待,也不要阻止处理来自其他主题的消息(my-topic-uuid1、my-topic-uuid2等)。
例如,我按以下顺序传递消息:
00:00:00-消息1->my-topic-uuid1
00:00:01-消息2->my-topic-uuid2
消息处理需要5分钟。我想在我的kafkalistener处理完消息1时开始处理消息2而不必等待。
我知道它只可能作为每个特定分区的独立线程,但我不知道如何用kafkalistener和topicpattern配置它。
@KafkaListener(id = "MyConsumerID", topicPattern = "${kafka.myTopicsPattern}", groupId = "my-app",
containerFactory = "myKafkaListenerContainerFactory")
public void receiveMessage(ConsumerRecord<String, byte[]> record) {
//save bytes to file
//convert file
//send file to another service
//throw exception if sending failed (auto-commit is true, so I keep message uncommitted in topic in case of exception)
}
我认为只有编写定制的使用者,在创建新主题时在单独的线程中启动它,轮询消息并在最后停止这个使用者,才有可能。但这似乎不是个好办法。
有人面对过这样的案子吗?
暂无答案!
目前还没有任何答案,快来回答吧!