我使用ApacheKafka流媒体对kafka主题中使用的数据进行聚合。然后,聚合被序列化到另一个主题,本身被消耗,结果存储在数据库中。我想是相当经典的用例吧。
聚合调用的结果是创建一个由kafka changelog“topic”备份的ktable。
这比实际情况更复杂,但假设它存储给定键的事件计数和总和(以计算平均值):
KTable<String, Record> countAndSum = groupedByKeyStream.aggregate(...)
changelog“topic”似乎没有设置保留期(根据全局保留设置,我没有看到它与其他主题相反“过期”)。
这实际上是好的/必要的,因为这样可以避免在将来的事件具有相同的键时丢失聚合状态。
然而,从长远来看,这意味着这个变更日志将永远增长(随着更多的键进入)?我确实可能有很多密钥(我的聚合没有count/sum那么小)。
因为我有办法知道我不会再得到某个特定密钥的事件(有些事件被标记为“final”),有没有办法剥离changelog的这些特定密钥的聚合状态,以避免它永远增长,因为我不再需要它们,可能会有一点延迟“just”以防万一?
或者也许有一种方法可以完全不同于Kafka流媒体来避免这个“问题”?
1条答案
按热度按时间tjvv9vkg1#
是:changelog主题配置为日志压缩,而不是保留时间。如果您收到“final”记录,聚合就可以返回
null
作为聚合结果。这将从本地rocksdb存储以及基础changelog主题中删除它。