commitfailedexception提交无法完成,因为组已重新平衡分区并将其分配给另一个成员

bvuwiixz  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(334)

我使用的是Kafka0.10.2,现在面临着一个失败的例外。比如:
无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
我已将max.poll.interval.ms设置为integer.max\u值。有人能告诉我为什么即使我已经设定了值,这种情况仍然会发生吗?
另一个问题是:我按照说明将session.timeout.ms设置为60000,但仍然会发生这种情况。我试着用一个简单的代码复制

public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

当我将session.timeout.ms设置为10000时,我尝试在轮询循环中睡眠超过10000毫秒,但它似乎工作正常,也没有例外。所以我很困惑。如果heartbeat是由consumer.poll和consumer.commit触发的,则在我的代码中,heartbeat似乎超出了会话超时。为什么不抛出commitfailedexception?

u3r8eeie

u3r8eeie1#

为此,您需要处理代码中的重新平衡条件,并且应该处理正在进行的消息并在重新平衡之前提交它
比如:

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Implement what you want to do once rebalancing is done.
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // commit current method
    }
}

并使用以下语法订阅主题:
kafkaconsumer.subscribe(topicnamelist,new handlerebalance())
这样做的好处是:
当重新平衡发生时,消息不会重复。
无提交失败异常

tquggr8v

tquggr8v2#

session.timeout.ms 设定的耗电量应小于 group.max.session.timeout.ms Kafka的经纪人。
这为我解决了问题。
github链接提交失败的原因

相关问题