我正在使用KeyedCoProcess函数连接Kafka的2个流,并将它们持久化在Flink Value State中,这两个流都有一个公共密钥(用于访问状态)。消息到达第一个Kafka主题,然后通过key将它们持久化到状态中,当来自其他主题的消息以相同的key到达时,它在状态中被查找,并被丰富和转发到另一个Kafka主题。我正在使用RocksDbStateBackend将状态存储到PVC上。我面临的问题是,如果作业管理器重新启动,整个状态将丢失。除此之外,过期状态也没有从PVC中清除。附加代码示例和配置。
作业配置
EmbeddedRocksDBStateBackend embeddedRocksDb = new EmbeddedRocksDBStateBackend();
embeddedRocksDb.setDbStoragePath("path/to/pvc/mount_path");
env.setStateBackend(embeddedRocksDb);
env.getCheckpointConfig().setCheckpointStorage("checkpointing-location"));
env.enableCheckpointing(1000))));
字符串
PVC配置
podTemplate:
spec:
containers:
- name: flink-main-container
env:
volumeMounts:
- name: efs-claim
mountPath: "/data/flink/state"
imagePullSecrets:
- name: regcred
volumes:
- name: efs-claim
persistentVolumeClaim:
claimName: efs-claim
型
状态配置
private ValueStateDescriptor<XX>> getValueDesc() {
StateTtlConfig timeToLive = StateTtlConfig
.newBuilder(Time.minutes(1440))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(this.queryTimeAfterNumEntries)
.build();
descriptor.enableTimeToLive(timeToLive);
return descriptor;
}
型
我没有使用保存点,作业的拓扑在重新启动前后保持不变。请帮忙。如果有其他信息需要的话,请告诉我。先谢谢你。
1条答案
按热度按时间hkmswyz61#
在Speedb hive中被问到,这是我给你的答案:
“我能想到的唯一原因,rocksdb数据将丢失是在没有WAL的情况下写入。我们可以查看rocksdb日志来确保,但您需要在Flink支持中寻求帮助”
你试过在Slack上的Flink服务器上问吗?如果你想给我们发送你的日志,加入Hive here并回复你的问题-