使用Sping Boot 重新启动apche flink后无法恢复状态

pn9klfpd  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(148)
Configuration configuration = new Configuration();
            configuration.setBoolean("state.backend.local-recovery", true);
            configuration.setString("state.checkpoints.dir", rcokDbStorageLocation);
            configuration.setString("state.checkpoints.num-retained", "20");
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
            CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
            environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(5)));
            checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            environment.getConfig().enableObjectReuse();
            environment.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
            environment.setStateBackend((StateBackend)new RocksDBStateBackend(rcokDbStorageLocation).configure(configuration));
            environment.getConfig().setGlobalJobParameters(configuration);
            FlinkKafkaConsumer011<String> flinkKafkaConsumer2 = createStringConsumerForTopic(Constants.KAFKA_TOPIC_FLINK_LOGIN_FAILED);
            DataStream<String> stringInputStream2 = environment.addSource(flinkKafkaConsumer2);

            DataStream<UserLoginRequest> userLoginWithPinRequestDataStream = stringInputStream2.flatMap(new JsonDeserializer<>(UserLoginRequest.class)).returns(UserLoginRequest.class);
            userLoginWithPinRequestDataStream
                    .keyBy(UserLoginRequest::getUser_id)
                    .process(new WrongLoginProcessor())
                    .name(AppConstants.MONITOR_NAME.LOGIN_WITH_PIN);
            environment.execute();

字符串
这是我在重启Sping Boot 应用程序时的配置,我没有获取以前的状态,这里有什么问题?
例如,我在rockdb中有2个状态,然后重新启动后,这20个状态应该从rockdb中恢复

sauutmhj

sauutmhj1#

默认情况下,每次使用Flink应用程序时,作业开始时都没有任何状态.为了使用从早期作业中保存的状态重新启动应用程序,您需要通过指定保存点(或检查点)的位置来显式地实现这一点。
文档中介绍了如何使用CLI从保存点(或保留的检查点)重新启动作业。
举个例子:

$ ./bin/flink run \
      --detached \ 
      --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
      ./examples/streaming/StateMachineExample.jar

字符串
同样的事情也可以从REST API中完成。
有关更完整的教程,请参阅Flink Operations Playground中关于升级和重新缩放作业的部分。
有关使用保存点和检查点可以安全地完成哪些操作任务的更多信息,请参见Checkpoints vs Savepoints

相关问题