使用flink runner时如何在apache beam中执行检查点?

zf9nrax1  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(573)

我从一个未绑定的源(Kafka)和写作它的字数到其他Kafka主题。现在我想在beam管道中执行checkpoint。我已经按照apachebeam文档中的所有说明进行了操作,但即使在这之后也没有创建checkpoint目录。
下面是我用于pipeline:-

--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true

有人能帮我检查一下吗?

3j86kqsm

3j86kqsm1#

我已经研究过这个解决方案,所以一个是您可以在link cluster的flink-conf.yaml中更改checkpoint.state.dir路径,另一个是使用flinkpipelineoptions-

@Description(
                "Sets the state backend factory to use in streaming mode. "
                        + "Defaults to the flink cluster's state.backend configuration.")
        Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
        void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);

通过设置setstatebackendfactory(我已经使用自定义类)

static class  bakend implements FlinkStateBackendFactory{

        @Override
        public StateBackend createStateBackend(FlinkPipelineOptions options) {
            return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");

        }
    }

这将创建一个checkpointdir。您还需要设置checkpointinginterval的值,以便启用检查点。

相关问题