翻阅Kafka消费者代码,可以看到Kafka记录中有以下代码:
if (!first) {
// skip one poll timeout before trying again
long delay = endpoint.getConfiguration().getPollTimeoutMs();
log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
此延迟的默认值为5秒。我试图了解为什么这是必需的,也是否会有任何问题,如果我减少到1秒。5秒似乎是很多时间,据我所知,这将是一个完全停止任何处理。
引用文件:https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/kafkaconsumer.java
线路编号:198
暂无答案!
目前还没有任何答案,快来回答吧!