我在测试Kafka的作品,但我不知道´无法理解删除的工作原理。
我创建了一个简单的主题
retention.ms = 60000
和
segment.ms = 60000
和
cleanup.policy=delete.
在这之后,我创建了一个生产者,我发送了一些信息。消费者接收消息时没有问题。但我预计,一分钟后,如果一个重复的消费者,它不会显示消息,因为他们肯定已经被删除。但这种行为不会发生。
如果我在ksql中创建一个查询,它是相同的。消息总是出现。
我想我不明白删除是怎么回事。
例子:
1) 主题
./kafka-topics --create --zookeeper localhost:2181 --topic test --
replication-factor 2 --partitions 1 --config "cleanup.policy=delete" --
config "delete.retention.ms=60000" --config "segment.ms=60000"
2) 制作人
./kafka-avro-console-producer --broker-list broker:29092 --topic test--
property parse.key=true --property key.schema='{"type":"long"}' --property
"key.separator=:" --property value.schema='{"type": "record","name":
"ppp","namespace": "test.topic","fields": [{"name": "id","type": "long"}]}'
3) 来自生产者的消息
1:{"id": 1}
2:{"id": 2}
4:{"id": 4}
5:{"id": 5}
4) 消费者
./kafka-avro-console-consumer \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--topic test--from-beginning --property print.key=true
消费者显示四条信息。
但我预计,如果我在一分钟后再次运行consumer(我也等了更多时间,甚至几个小时),消息就不会显示了´无法显示,因为retention.ms和segment.ms是一分钟。
消息何时被真正删除?
2条答案
按热度按时间piok6c0g1#
更改
retention.ms
如ajay srivastava所述kafka-topics --zookeeper localhost:2181 --alter --topic test --config retention.ms=60000
再次测试。ou6hu8tu2#
Kafka在删除过程中要知道的另一个重要思想是
log segment file
:主题被划分为分区,对吗?这就是允许并行性、规模等的原因。。
每个分区分为
log segments files
. 为什么?因为Kafka把数据写入磁盘对吗。。。?我们不想把整个topic
/partition
在一个巨大的文件,但分裂成更小的文件(段)。。将数据拆分成更小的文件有很多好处,但与问题无关。你可以在这里读更多
这里要注意的关键是:
保留策略正在查看日志semgnet的文件时间戳。
“按时间保留是通过检查磁盘上每个日志段文件的上次修改时间(mtime)来执行的。在正常的clus-ter操作中,这是日志段关闭的时间,表示文件中最后一条消息的时间戳
(摘自《Kafka权威指南》,第26页)
版本0.10.1.0
日志保留时间不再基于日志段的上次修改时间。相反,它将基于日志段中消息的最大时间戳。
这意味着它只在关闭的日志段文件上查找。确保“段”配置参数正确。。