java 这是一个有效的Flink StateTtlConfig吗?

xxhby3vn  于 2023-04-19  发布在  Java
关注(0)|答案(1)|浏览(99)

通过阅读官方文档,我不清楚是否可以合并不同的清理选项,在本例中是cleanupInRocksdbCompactFiltercleanupFullSnapshot

StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(3))
                .cleanupInRocksdbCompactFilter(1000)
                .cleanupFullSnapshot()
                .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
a11xaf1n

a11xaf1n1#

我给你三个建议

根据Apache Flink官方文档,StateTtlConfig一次只能有一个cleanup选项。因此,在您提供的示例中,将cleanupInRocksdbCompactFilter和cleanupFullSnapshot选项组合在一起是无效的。
cleanupInRocksdbCompactFilter选项通常效率更高,因为它只在RocksDB压缩期间清理过期状态,而cleanupFullSnapshot选项会获取状态的完整快照并一次性删除所有过期状态。
因此,例如,如果您有大量的状态,并且希望最大限度地减少对性能的影响,则可以使用cleanupInRocksdbCompactFilter选项,并将cleanupInRocksdbCompactFilter值设置为较高。另一方面,如果您的状态数量较少,并且需要确保尽快删除过期状态,则可以使用cleanupFullSnapshot选项。

第一种方式-

下面是使用cleanupInRocksdbCompactFilter选项的有效StateTtlConfig示例:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.minutes(3))
    .cleanupInRocksdbCompactFilter(1000)
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

第二种方式-

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMinutes(3))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()
    .cleanupInBackground(Duration.ofMinutes(1))
    .build();

在这个例子中,我们使用Duration类代替了过时的Time类。我们还将UpdateType设置为OnCreateAndWrite,这意味着当新值写入状态和初始创建状态时,将检查TTL。我们还使用StateVisibility.NeverReturnExpired来确保Flink作业永远不会返回过期的状态值。
此外,我们同时使用cleanupFullSnapshot()cleanupInBackground(Duration.ofMinutes(1))来确保在完整快照期间和定期在后台有效清理过期状态。这有助于最大限度地减少对作业性能的影响,同时仍然保持有效的状态清理。
总的来说,这个例子为Flink StateTtlConfig提供了一个更健壮和专业的配置。

第三种方式-

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(3))
    // Only keep data in RocksDB for 1000 records
    .cleanupInRocksdbCompactFilter(1000)
    // Take full snapshot of state to remove expired keys
    .cleanupFullSnapshot()
    // Update TTL on both read and write operations
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    // Never return expired state
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

在这里,我假设所需的Flink库已经导入,并且此代码正在有效的Flink上下文中使用。

相关问题