使用autocommit时消费者重新平衡

nbewdwxp  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(709)

我们正在使用具有以下配置的consumer kafka client 0.10.2.0:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

如你所见,我们正在使用自动提交。我们使用的消费者api版本有一个专门的线程来执行autocommit。所以每一秒我们都有一个自动提交,这意味着我们每一秒都有一个心跳。
我们的应用程序处理时间实际上可能(有时)超过40秒(请求超时间隔)
我想问的是:
1-如果处理时间需要,例如,一分钟。会有一个重新平衡,虽然有自动提交的心豆每秒钟?
2-更奇怪的是,在执行时间长的情况下,我们似乎不止一次收到相同的消息。正常吗?如果消费者承诺了补偿,为什么再平衡会使相同的补偿再次被使用?
谢谢,奥利尔

ilmyapht

ilmyapht1#

从kafka v0.10.1.0开始,您不需要手动触发auto commit来执行心跳。Kafka消费者自己在后台为心跳机制开创了一条新思路。要了解更多,请阅读kip-62。
在你的情况下,你可以设置 max.poll.interval.ms 处理器处理 max.poll.record 记录。

oipij1gg

oipij1gg2#

你可以用 KafkaConsumer.pause() / KafkaConsumer.resume() 以防止在长时间处理暂停期间消费者重新平衡。Java文档。看看这个问题。
第2条。你确定这些补偿已经兑现了吗?

ujv3wf0j

ujv3wf0j3#

只是澄清一下,在每次轮询中都会调用autocommit check,它会检查经过的时间是否大于配置的时间,如果是,则只执行提交
例如,如果提交间隔为5秒,轮询在7秒内发生,那么在这种情况下,提交将在7秒之后发生
谢谢你的提问
自动提交不计入心跳,如果处理时间长,那么提交显然不会发生,并将导致会话超时,进而触发重新平衡
除非您正在寻找/重置偏移量到以前提交的偏移量,或者发生了使用者重新平衡,否则不应该发生这种情况

相关问题