新的Flink Kafka(KafkaSource)可以从旧的FlinkKafkaConsumer的保存点/检查点启动吗?

jdzmm42g  于 2023-02-01  发布在  Apache
关注(0)|答案(2)|浏览(232)

我有一个使用旧Flink Kafka Consumer(FlinkKafkaConsumer)运行的作业,现在我想将其迁移到KafkaSource。但我不确定此迁移会产生什么影响。我希望作业从旧FlinkKafkaConsumer最近成功执行的检查点开始,这可能吗?如果不可能,我应该如何正确地迁移Kafka Consumer?

pdsfdshx

pdsfdshx1#

假设配置相同,只要之前的使用者组ID配置与之前实现使用的组ID配置匹配,这两个配置应该可以互换使用,可以将其与OffsetsInitializer.latest()结合使用,以确保继续从之前提交的相同偏移阅读:

KafkaSource.<YourExampleClass>builder()
   ...
   .setGroupId("your-previous-group-id")
   .setStartingOffsets(OffsetsInitializer.latest())

虽然这两种方法应该“刚好有效”,但值得注意的是,您的特定管道及其使用并行性的方式可能会揭示一些differences between FlinkKafkaConsumer and the newer KafkaSource特性:
在Kafka分区的数量小于Flink的KafkaSource操作符的并行性的情况下,KafkaSource的行为不同于FlinkKafkaConsumer。

kupeojn6

kupeojn62#

当FlinkKafkaConsumer被弃用时,如何从FlinkKafkaConsumer升级到KafkaSource已经包含在Flink1.14的发行说明中https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

相关问题