我想在kubernetes上运行ApacheFlink(1.11.1)流应用程序。文件系统状态后端保存到s3。s3的检查点正在工作
args:
- "standalone-job"
- "-s"
- "s3://BUCKET_NAME/34619f2862ce3e5fc91d80eae13a434a/chk-4/_metadata"
- "--job-classname"
- "com.abc.def.MY_JOB"
- "--kafka-broker"
- "KAFKA_HOST:9092"
所以我面临的问题是:
我必须手动选择上一个状态目录。有没有可能做得更好?
作业增加chk dir,但不使用检查点。意思是当我第一次看到一个事件时抛出一个新事件并将其存储到 ListState<String>
每当我通过gitlab部署更新版本的应用程序时,它都会再次抛出此事件。
当我定义了文件系统的state.backend时,为什么必须在代码中显式启用检查点? env.enableCheckpointing(Duration.ofSeconds(60).toMillis());
以及 env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
2条答案
按热度按时间krugob8w1#
如果使用ververicaplatform:communityedition,您可能会更高兴,它将抽象级别提高到不必在这个级别处理细节的程度。它有一个api,设计时考虑了ci/cd。
我不确定我是否理解你的第二点,但在恢复过程中,你的工作会倒带并重新处理一些数据是正常的。flink不保证只处理一次,而是保证只处理一次语义:每个事件只影响flink管理的状态一次。这是通过回滚到最近的检查点中的偏移量来完成的,并将所有其他状态回滚到消耗所有数据后的状态,直到这些偏移量为止。
在作业运行时,有一个状态后端作为存储作业工作状态的位置是必要的。如果不启用检查点,则工作状态将不会被检查点,并且无法恢复。但是,从Flink1.11开始,您可以通过配置文件启用检查点,使用
f0brbegy2#
有几种方法可以将工作负载部署到kubernetes、简单yaml文件、helmchart和operator。
升级一个有状态的flink作业并不像升级一个无状态的服务那么简单,您只需要更新二进制文件并重新启动。
升级flink作业你需要先取一个保存点或获取最新的checkpoint dir,然后更新二进制文件,最后重新提交你的作业,在这种情况下,我认为简单的yaml文件和helm图表不能帮助你实现这一点,你应该考虑实现一个flink操作符来做升级作业。
https://github.com/googlecloudplatform/flink-on-k8s-operator