commitfailedexception:无法完成提交,因为组已重新平衡

lnxxn5zx  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(467)

恩vironment:hadoop2.75.+flink1.4+kafka0.10
我已经建立了一个实时数据处理项目。我使用flink表源api(kafka010jsontablesource)作为tablasource。从kafka获取数据,然后执行一个sql,最后输出到一个kafka主题,这是一个清晰的流程,但是我在flink cluster上执行时遇到了异常,下面是我的主要代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
env.enableCheckpointing(5000) 
val tableEnv = TableEnvironment.getTableEnvironment(env)
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
    .forTopic(kafkaConfig.topic)
    .withKafkaProperties(props)
    .withSchema(dynamicJsonSchema)
    .withRowtimeAttribute(
         enventTimeFieldName,  
         new ExistingField(enventTimeFieldName),  
         new MyBoundedOutOfOrderTimestamps(100L)) 
    .build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
tableResult.writeToSink(new Kafka010JsonTableSink(kafkaOutput.topic, props))

我已经启用了检查点。第一次在flink上执行时,我只是遵循consumer的默认配置。在flink任务运行之后,我检查了kafka shell命令(kafka consumer groups.sh)的偏移量,发现了一个奇怪的情况。根据flink task manager的shell命令输出和日志,我发现偏移量在几秒钟开始时已成功提交,但后来我继续遇到许多异常,如下所示:
blockquote 2018-01-19 09:24:03174警告org.apache.flink.streaming.connectors.kafka.internal.kafka09fetcher-将偏移提交到kafka失败。这不会影响Flink的检查点。org.apache.kafka.clients.consumer.commitfailedexception:无法完成提交,因为组已重新平衡分区并将其分配给其他成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。在org.apache.kafka.clients.consumer.internals.consumercoordinator$offsetcommitresponsehandler.handle(consumercoordinator。java:792)在org.apache.kafka.clients.consumer.internals.consumercoordinator$offsetcommitresponsehandler.handle(consumercoordinator。java:738)在org.apache.kafka.clients.consumer.internals.abstractcoordinator$coordinatorresponsehandler.onsuccess(abstractcoordinator)。java:808)在org.apache.kafka.clients.consumer.internals.abstractcoordinator$coordinatorresponsehandler.onsuccess(abstractcoordinator。java:788)在org.apache.kafka.clients.consumer.internals.requestfuture$1.onsuccess(requestfuture。java:204)在org.apache.kafka.clients.consumer.internals.requestfuture.firesucces(requestfuture。java:167)位于org.apache.kafka.clients.consumer.internals.requestfuture.complete(requestfuture。java:127)在org.apache.kafka.clients.consumer.internals.consumernetworkclient$requestfuturecompletionhandler.firecompletion(consumernetworkclient)。java:488)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.firependingcompletedrequests(consumernetworkclient。java:348)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:262)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:208)在org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer。java:1096)在org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer。java:1043)在org.apache.flink.streaming.connectors.kafka.internal.kafkaconsumerthread.run(kafkaconsumerthread。java:247)2018-01-19 09:24:03174 warn org.apache.flink.streaming.connectors.kafka.flinkkafcumerbase-异步kafka提交失败。org.apache.kafka.clients.consumer.commitfailedexception:无法完成提交,因为组已重新平衡分区并将其分配给其他成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。在org.apache.kafka.clients.consumer.internals.consumercoordinator$offsetcommitresponsehandler.handle(consumercoordinator。java:792)在org.apache.kafka.clients.consumer.internals.consumercoordinator$offsetcommitresponsehandler.handle(consumercoordinator。java:738)在org.apache.kafka.clients.consumer.internals.abstractcoordinator$coordinatorresponsehandler.onsuccess(abstractcoordinator)。java:808)在org.apache.kafka.clients.consumer.internals.abstractcoordinator$coordinatorresponsehandler.onsuccess(abstractcoordinator。java:788)在org.apache.kafka.clients.consumer.internals.requestfuture$1.onsuccess(requestfuture。java:204)在org.apache.kafka.clients.consumer.internals.requestfuture.firesucces(requestfuture。java:167)位于org.apache.kafka.clients.consumer.internals.requestfuture.complete(requestfuture。java:127)在org.apache.kafka.clients.consumer.internals.consumernetworkclient$requestfuturecompletionhandler.firecompletion(consumernetworkclient)。java:488)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.firependingcompletedrequests(consumernetworkclient。java:348)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:262)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:208)在org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer。java:1096)在org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer。java:1043)在org.apache.flink.streaming.connectors.kafka.internal.kafkaconsumerthread.run(kafkaconsumerthread。java:247)
所以我根据上面的错误信息搜索了解决方案,有人告诉我应该增加session.timeout.ms,然后我照做了,但是还是失败了。之后我试着对下面的多种组合配置进行测试,Kafka偏移量一开始总是提交成功,但后来提交失败。我真的不知道怎么解决,你能帮我解决吗?非常感谢!!!!!!
Kafka消费者配置组合如下: { "propertyKey": "session.timeout.ms", "propertyValue": "300000" }, { "propertyKey": "request.timeout.ms", "propertyValue": "505000" }, { "propertyKey": "auto.commit.interval.ms", "propertyValue": "10000" }, { "propertyKey": "max.poll.records", "propertyValue": "50" }, { "propertyKey": "max.poll.interval.ms", "propertyValue": "500000" }, { "propertyKey": "client.id", "propertyValue": "taxi-client-001" }, { "propertyKey": "heartbeat.interval.ms", "propertyValue": "99000" } 我试图将上述配置更改为所有类型的值,但都失败了,甚至我根据kafka官方文档指南对它们进行了配置。我希望你能帮助修复以上错误,非常感谢!!!

jm81lzqq

jm81lzqq1#

我找到了根本原因。重新平衡错误总是发生的原因是两个使用者(一个是使用者输入数据,另一个是使用者输出数据)组名称相同。我怀疑只有一个协调器没有足够的能力来处理两个消费者的offset commit操作。错误从未发生过。

相关问题