Flink在某个超时后显式释放状态

bt1cpqcv  于 2023-05-15  发布在  Apache
关注(0)|答案(2)|浏览(121)

我有一个值状态,它存储了一些计算数据,这些数据将被频繁访问,计算起来相当昂贵,因此我使用值状态在一个键控的进程函数中缓存这些信息。
从Flink文档来看,我认为设置TTL并没有显式地清理,但它是如此懒惰。这就出现了一个问题,因为我有大量的数据处于状态,导致作业耗尽内存。
有没有一种方法可以明确地解放国家?类似于如果状态超过10分钟没有被读取,则将其从RAM中释放。

km0tfn4u

km0tfn4u1#

您可以参考“清除过期状态”一节:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state
对于堆状态后端,您可以设置增量清理。但不是一个固定的超时。
另一个选项是增量地触发一些状态条目的清除。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果此清理策略对于特定状态是活动的,则存储后端在其所有条目上为该状态保留惰性全局迭代器。每次触发增量清理时,迭代器都会前进。检查遍历的状态条目并清除过期的状态条目。
可以在StateTtlConfig中配置此功能:

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
   .newBuilder(Time.seconds(1))
   .cleanupIncrementally(10, true)
   .build

对于RocksDB状态后端,您可以在RocksDB压缩过滤器中配置清理。它也有自己的警告。
如果使用RocksDB状态后端,将调用Flink特定的压缩过滤器进行后台清理。RocksDB定期运行异步压缩来合并状态更新并减少存储。Flink压缩过滤器使用TTL检查状态条目的过期时间戳,并排除过期值。
可以在StateTtlConfig中配置此功能:

import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build
pbgvytdp

pbgvytdp2#

您可以在键控的进程函数中设置键控的计时器,并在计时器触发时显式地调用clear()

相关问题