Flink 当作业管理器重新启动时,无法恢复PVC上的RocksDb状态

lf5gs5x2  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(152)

我正在使用KeyedCoProcess函数连接Kafka的2个流,并将它们持久化在Flink Value State中,这两个流都有一个公共密钥(用于访问状态)。消息到达第一个Kafka主题,然后通过key将它们持久化到状态中,当来自其他主题的消息以相同的key到达时,它在状态中被查找,并被丰富和转发到另一个Kafka主题。我正在使用RocksDbStateBackend将状态存储到PVC上。我面临的问题是,如果作业管理器重新启动,整个状态将丢失。除此之外,过期状态也没有从PVC中清除。附加代码示例和配置。
作业配置

EmbeddedRocksDBStateBackend embeddedRocksDb = new EmbeddedRocksDBStateBackend();
    embeddedRocksDb.setDbStoragePath("path/to/pvc/mount_path");
    env.setStateBackend(embeddedRocksDb);
    env.getCheckpointConfig().setCheckpointStorage("checkpointing-location"));
    env.enableCheckpointing(1000))));

字符串
PVC配置

podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
          volumeMounts:
            - name: efs-claim
              mountPath: "/data/flink/state"
      imagePullSecrets:
        - name: regcred
      volumes:
        - name: efs-claim
          persistentVolumeClaim:
            claimName: efs-claim


状态配置

private ValueStateDescriptor<XX>> getValueDesc() {
        StateTtlConfig timeToLive = StateTtlConfig
                .newBuilder(Time.minutes(1440))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .cleanupInRocksdbCompactFilter(this.queryTimeAfterNumEntries)
                .build();
        descriptor.enableTimeToLive(timeToLive);
        return descriptor;
    }


我没有使用保存点,作业的拓扑在重新启动前后保持不变。请帮忙。如果有其他信息需要的话,请告诉我。先谢谢你。

hkmswyz6

hkmswyz61#

在Speedb hive中被问到,这是我给你的答案:
“我能想到的唯一原因,rocksdb数据将丢失是在没有WAL的情况下写入。我们可以查看rocksdb日志来确保,但您需要在Flink支持中寻求帮助”
你试过在Slack上的Flink服务器上问吗?如果你想给我们发送你的日志,加入Hive here并回复你的问题-

相关问题