我有一个使用旧Flink Kafka Consumer(FlinkKafkaConsumer
)运行的作业,现在我想将其迁移到KafkaSource
。但我不确定此迁移会产生什么影响。我希望作业从旧FlinkKafkaConsumer最近成功执行的检查点开始,这可能吗?如果不可能,我应该如何正确地迁移Kafka Consumer?
我有一个使用旧Flink Kafka Consumer(FlinkKafkaConsumer
)运行的作业,现在我想将其迁移到KafkaSource
。但我不确定此迁移会产生什么影响。我希望作业从旧FlinkKafkaConsumer最近成功执行的检查点开始,这可能吗?如果不可能,我应该如何正确地迁移Kafka Consumer?
2条答案
按热度按时间pdsfdshx1#
假设配置相同,只要之前的使用者组ID配置与之前实现使用的组ID配置匹配,这两个配置应该可以互换使用,可以将其与
OffsetsInitializer.latest()
结合使用,以确保继续从之前提交的相同偏移阅读:虽然这两种方法应该“刚好有效”,但值得注意的是,您的特定管道及其使用并行性的方式可能会揭示一些differences between
FlinkKafkaConsumer
and the newerKafkaSource
特性:在Kafka分区的数量小于Flink的KafkaSource操作符的并行性的情况下,KafkaSource的行为不同于FlinkKafkaConsumer。
kupeojn62#
当FlinkKafkaConsumer被弃用时,如何从FlinkKafkaConsumer升级到KafkaSource已经包含在Flink1.14的发行说明中https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer