我在使用kstream连接时遇到问题。我所做的是从一个主题中分离出3种不同类型的消息到新的流。然后用两个流做一个innerjoin,这两个流创建了另一个流,最后我用新的流和剩下的最后一个流做最后一个leftjoin。
连接的窗口时间为30秒。
这样做是为了过滤掉一些被其他消息覆盖的消息。
我在kubernetes上运行这个应用程序,pod的磁盘空间无限增长,直到pod崩溃。
我意识到这是因为join将数据本地存储在tmp/kafka streams目录中。
这些目录称为:kstream jointhis。。。kstream outerother。。
它存储来自rocksdb的sst文件,并且这些文件无限期地增长。
我的理解是,因为我使用的窗口时间为30秒,这些应该是冲出来后,一定的时间,但不是。
我还将window\u store\u change\u log\u additional\u retention\u ms\u config更改为10分钟,以查看是否发生了实际情况不同的更改。
我要明白这是怎么改变的。
1条答案
按热度按时间w46czmvw1#
窗口大小并不决定您的存储需求,而是决定联接的保留期。为了处理无序记录,数据的存储时间比窗口大小长(默认保留时间为1天)。
您可以通过传入来减少保留时间
Materialized.as(null).withRetention(...)
进入你的join(...)
操作员。在旧的Kafka版本中,您可以通过窗口定义指定保留期,例如,
JoinWindows.of(...).until(...)
.配置
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
配置数据在群集中存储的时间。因此,您可能也想减少它,但这无助于减少客户端存储需求。