我们正在使用具有以下配置的consumer kafka client 0.10.2.0:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
如你所见,我们正在使用自动提交。我们使用的消费者api版本有一个专门的线程来执行autocommit。所以每一秒我们都有一个自动提交,这意味着我们每一秒都有一个心跳。
我们的应用程序处理时间实际上可能(有时)超过40秒(请求超时间隔)
我想问的是:
1-如果处理时间需要,例如,一分钟。会有一个重新平衡,虽然有自动提交的心豆每秒钟?
2-更奇怪的是,在执行时间长的情况下,我们似乎不止一次收到相同的消息。正常吗?如果消费者承诺了补偿,为什么再平衡会使相同的补偿再次被使用?
谢谢,奥利尔
3条答案
按热度按时间ilmyapht1#
从kafka v0.10.1.0开始,您不需要手动触发auto commit来执行心跳。Kafka消费者自己在后台为心跳机制开创了一条新思路。要了解更多,请阅读kip-62。
在你的情况下,你可以设置
max.poll.interval.ms
处理器处理max.poll.record
记录。oipij1gg2#
你可以用
KafkaConsumer.pause()
/KafkaConsumer.resume()
以防止在长时间处理暂停期间消费者重新平衡。Java文档。看看这个问题。第2条。你确定这些补偿已经兑现了吗?
ujv3wf0j3#
只是澄清一下,在每次轮询中都会调用autocommit check,它会检查经过的时间是否大于配置的时间,如果是,则只执行提交
例如,如果提交间隔为5秒,轮询在7秒内发生,那么在这种情况下,提交将在7秒之后发生
谢谢你的提问
自动提交不计入心跳,如果处理时间长,那么提交显然不会发生,并将导致会话超时,进而触发重新平衡
除非您正在寻找/重置偏移量到以前提交的偏移量,或者发生了使用者重新平衡,否则不应该发生这种情况