kafka010jsontablesource不会自动提交kafka偏移量

uinbv5nw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(250)

我已经建立了当地的Kafka0.10+flink1.4环境。
我使用下面的代码从Kafka主题中获取消费者数据:

val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(enventTimeFieldName,new ExistingField(enventTimeFieldName), new BoundedOutOfOrderTimestamps(30000L))
.build() 

tableEnv.registerTableSource(tableName, tableSource)

val tableResult:Table = tableEnv.sqlQuery(sql)

执行此代码后,始终会发现警告消息:
自动提交组taxidatagroup的偏移量{taxidata-0=offsetandmetadata{offset=728461,metadata=''}}失败:无法完成提交,因为该组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
不管我在Kafka中设置了什么属性,它总是显示上面的警告信息。

{
    "propertyKey": "enable.auto.commit",
    "propertyValue": "true"
  },
  {
    "propertyKey": "session.timeout.ms",
    "propertyValue": "250000"
  },
  {
    "propertyKey": "request.timeout.ms",
    "propertyValue": "305000"
  },
  {
    "propertyKey": "auto.commit.interval.ms",
    "propertyValue": "800000"
  },
  {
    "propertyKey": "max.poll.records",
    "propertyValue": "300"
  },
  {
    "propertyKey": "max.poll.interval.ms",
    "propertyValue": "300000"
  }

我不确定kafka010jsontablesource flink1.4是否会自动提交偏移量。但是测试结果表明它不会自动提交偏移量。有人能帮忙确认这个问题吗?或者你能在我的代码中看到其他问题吗?

ttisahbt

ttisahbt1#

你试过把价格定低一点吗 session.timeout.ms 比经纪人的价值高 group.max.session.timeout.ms 价值?根据https://github.com/dpkp/kafka-python/issues/746,这似乎就是问题所在。

相关问题