我看到了这样一个问题:经过一段时间,随机地,消费者停止消费某个主题的消息,并且并没有抛出任何错误。主题中没有太多消息,因为这是一个测试环境。
该主题的消费者延迟将大于0,并且将被固定在该数字上(就像没有消费者消费它一样)。我使用kafka工具检查了这个主题,我可以看到分区被分配给了一个使用者。
我正在运行多个消费者线程来消费一个主题,下面是代码:
public Runnable getRunnable() {
return () -> {
final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate =
kafkaLib.createKafkaConsumerTemplate();
kafkaConsumerTemplate.receive()
.concatMap(record -> {
// process message and commit offset
})
.subscribe();
};
}
@Override
public void run(ApplicationArguments args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
executorService.execute(getRunnable());
}
}
我的问题是:如果在一段时间内没有消息推送到主题,那么kafkaconsumpertemplate.receive()是否有可能自己停止/退出/取消订阅?或者在订阅结束后线程会退出?
使用ReactKafka消费者,没有更多 @KafkaListener
所以我使用applicationrunner并生成消费者线程。有没有另一种正确的方法来启动ReactKafka消费者?
暂无答案!
目前还没有任何答案,快来回答吧!