重新运行后flink状态为空(重新初始化)

vfwfrxfs  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(777)

我试着连接两条流,第一条是坚持 MapValueState : RocksDB 将数据保存在检查点文件夹中,但在新运行之后, state 是空的。我在本地和flink集群中运行它,在集群中取消提交,然后在本地重新运行

env.setStateBackend(new RocksDBStateBackend(..)
 env.enableCheckpointing(1000)
 ...

   val productDescriptionStream: KeyedStream[ProductDescription, String] = env.addSource(..)
  .keyBy(_.id)

 val productStockStream: KeyedStream[ProductStock, String] = env.addSource(..)
    .keyBy(_.id)

productDescriptionStream
  .connect(productStockStream)
  .process(ProductProcessor())
  .setParallelism(1)

env.execute("Product aggregator")

产品处理器

case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product]{
private[this] lazy val stateDescriptor: MapStateDescriptor[String, ProductDescription] =
new MapStateDescriptor[String, ProductDescription](
  "productDescription",
  createTypeInformation[String],
  createTypeInformation[ProductDescription]
)
private[this] lazy val states: MapState[String, ProductDescription] = getRuntimeContext.getMapState(stateDescriptor)

override def processElement1(value: ProductDescription,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context,out: Collector[Product]
 ): Unit = {
  states.put(value.id, value)
 }}

 override def processElement2(value: ProductStock,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]
 ): Unit = {
  if (states.contains(value.id)) {
         val product =Product(
          id = value.id,
          description = Some(states.get(value.id).description),
          stock = Some(value.stock),
          updatedAt = value.updatedAt)
        out.collect(product )
 }}
44u64gxh

44u64gxh1#

flink创建检查点是为了从故障中恢复,而不是在手动关机后恢复。取消作业时,flink的默认行为是删除检查点。既然工作不能再失败了,就不需要恢复了。
您有几种选择:
(1) 将检查点配置为在取消作业时保留检查点:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(
  CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

然后,当您重新启动作业时,您需要指示您希望它从特定的检查点重新启动:

flink run -s <checkpoint-path> ...

否则,无论何时启动作业,它都将以空状态后端开始。
(2) 使用stop with savepoint代替取消作业:

flink stop [-p targetDirectory] [-d] <jobID>

之后你需要再次使用 flink run -s ... 从保存点恢复。
与依赖最近的检查点相比,使用保存点停止是一种更干净的方法。
(3) 或者您可以使用ververicaplatformcommunityedition,它将抽象级别提高到不必自己管理这些细节的程度。

相关问题