我使用的是Kafka0.10.2,现在面临着一个失败的例外。比如:
无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
我已将max.poll.interval.ms设置为integer.max\u值。有人能告诉我为什么即使我已经设定了值,这种情况仍然会发生吗?
另一个问题是:我按照说明将session.timeout.ms设置为60000,但仍然会发生这种情况。我试着用一个简单的代码复制
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
当我将session.timeout.ms设置为10000时,我尝试在轮询循环中睡眠超过10000毫秒,但它似乎工作正常,也没有例外。所以我很困惑。如果heartbeat是由consumer.poll和consumer.commit触发的,则在我的代码中,heartbeat似乎超出了会话超时。为什么不抛出commitfailedexception?
2条答案
按热度按时间u3r8eeie1#
为此,您需要处理代码中的重新平衡条件,并且应该处理正在进行的消息并在重新平衡之前提交它
比如:
并使用以下语法订阅主题:
kafkaconsumer.subscribe(topicnamelist,new handlerebalance())
这样做的好处是:
当重新平衡发生时,消息不会重复。
无提交失败异常
tquggr8v2#
session.timeout.ms
设定的耗电量应小于group.max.session.timeout.ms
Kafka的经纪人。这为我解决了问题。
github链接提交失败的原因