我已经建立了当地的Kafka0.10+flink1.4环境。
我使用下面的代码从Kafka主题中获取消费者数据:
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(enventTimeFieldName,new ExistingField(enventTimeFieldName), new BoundedOutOfOrderTimestamps(30000L))
.build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
执行此代码后,始终会发现警告消息:
自动提交组taxidatagroup的偏移量{taxidata-0=offsetandmetadata{offset=728461,metadata=''}}失败:无法完成提交,因为该组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
不管我在Kafka中设置了什么属性,它总是显示上面的警告信息。
{
"propertyKey": "enable.auto.commit",
"propertyValue": "true"
},
{
"propertyKey": "session.timeout.ms",
"propertyValue": "250000"
},
{
"propertyKey": "request.timeout.ms",
"propertyValue": "305000"
},
{
"propertyKey": "auto.commit.interval.ms",
"propertyValue": "800000"
},
{
"propertyKey": "max.poll.records",
"propertyValue": "300"
},
{
"propertyKey": "max.poll.interval.ms",
"propertyValue": "300000"
}
我不确定kafka010jsontablesource flink1.4是否会自动提交偏移量。但是测试结果表明它不会自动提交偏移量。有人能帮忙确认这个问题吗?或者你能在我的代码中看到其他问题吗?
1条答案
按热度按时间ttisahbt1#
你试过把价格定低一点吗
session.timeout.ms
比经纪人的价值高group.max.session.timeout.ms
价值?根据https://github.com/dpkp/kafka-python/issues/746,这似乎就是问题所在。