Configuration configuration = new Configuration();
configuration.setBoolean("state.backend.local-recovery", true);
configuration.setString("state.checkpoints.dir", rcokDbStorageLocation);
configuration.setString("state.checkpoints.num-retained", "20");
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(5)));
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getConfig().enableObjectReuse();
environment.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
environment.setStateBackend((StateBackend)new RocksDBStateBackend(rcokDbStorageLocation).configure(configuration));
environment.getConfig().setGlobalJobParameters(configuration);
FlinkKafkaConsumer011<String> flinkKafkaConsumer2 = createStringConsumerForTopic(Constants.KAFKA_TOPIC_FLINK_LOGIN_FAILED);
DataStream<String> stringInputStream2 = environment.addSource(flinkKafkaConsumer2);
DataStream<UserLoginRequest> userLoginWithPinRequestDataStream = stringInputStream2.flatMap(new JsonDeserializer<>(UserLoginRequest.class)).returns(UserLoginRequest.class);
userLoginWithPinRequestDataStream
.keyBy(UserLoginRequest::getUser_id)
.process(new WrongLoginProcessor())
.name(AppConstants.MONITOR_NAME.LOGIN_WITH_PIN);
environment.execute();
字符串
这是我在重启Sping Boot 应用程序时的配置,我没有获取以前的状态,这里有什么问题?
例如,我在rockdb中有2个状态,然后重新启动后,这20个状态应该从rockdb中恢复
1条答案
按热度按时间sauutmhj1#
默认情况下,每次使用Flink应用程序时,作业开始时都没有任何状态.为了使用从早期作业中保存的状态重新启动应用程序,您需要通过指定保存点(或检查点)的位置来显式地实现这一点。
文档中介绍了如何使用CLI从保存点(或保留的检查点)重新启动作业。
举个例子:
字符串
同样的事情也可以从REST API中完成。
有关更完整的教程,请参阅Flink Operations Playground中关于升级和重新缩放作业的部分。
有关使用保存点和检查点可以安全地完成哪些操作任务的更多信息,请参见Checkpoints vs Savepoints。