我正试图删除Kafka的旧消息,它没有按预期工作。我还配置了kafka rentension.ms、log.cleanup属性。但它并没有在5分钟后删除旧消息。这里是配置和5分钟内新的消息也发表了即使旧的记录在Kafka主题。你能帮我找出这个配置中我遗漏了什么吗?。因为它增加了存储成本。
-config retention.bytes=-1--config cleanup.policy=delete--config retention.ms=300000
我正试图删除Kafka的旧消息,它没有按预期工作。我还配置了kafka rentension.ms、log.cleanup属性。但它并没有在5分钟后删除旧消息。这里是配置和5分钟内新的消息也发表了即使旧的记录在Kafka主题。你能帮我找出这个配置中我遗漏了什么吗?。因为它增加了存储成本。
-config retention.bytes=-1--config cleanup.policy=delete--config retention.ms=300000
1条答案
按热度按时间zlhcx6iw1#
首先,重要的是要理解
LogCleaner
将只删除主题的旧段上的数据,如的配置说明中所述cleanup.policy
:“delete”或“compact”或两者兼有的字符串。此字符串指定要在旧日志段上使用的保留策略。“
很可能您的所有数据仍在一个段中,因此您需要减少
segment.bytes
为您的主题进行配置,以便您实际获得“旧”片段。此配置默认为1gb,描述如下:“此配置控制日志的段文件大小。保留和清理总是一次对一个文件执行,因此较大的段大小意味着更少的文件,但对保留的粒度控制更少。”
如果您不想等到某个段被填满了,也可以随意减少配置
segment.ms
从默认值7天到更适合您的情况。此配置描述为:“此配置控制kafka强制日志滚动的时间段(即使段文件未满),以确保保留可以删除或压缩旧数据。”
如果您的保留时间很短,比如5分钟,那么您可能还需要减少代理范围的配置
log.cleaner.delete.retention.ms
从默认值1天更改为较低的值。此配置描述为:“删除记录保留多长时间?”