删除apache kafka中的主题消息

k7fdbhmy  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(589)

我在测试Kafka的作品,但我不知道´无法理解删除的工作原理。
我创建了一个简单的主题

retention.ms = 60000

segment.ms = 60000

cleanup.policy=delete.

在这之后,我创建了一个生产者,我发送了一些信息。消费者接收消息时没有问题。但我预计,一分钟后,如果一个重复的消费者,它不会显示消息,因为他们肯定已经被删除。但这种行为不会发生。
如果我在ksql中创建一个查询,它是相同的。消息总是出现。
我想我不明白删除是怎么回事。
例子:
1) 主题

./kafka-topics --create --zookeeper localhost:2181 --topic test -- 
  replication-factor 2 --partitions 1 --config "cleanup.policy=delete" -- 
  config "delete.retention.ms=60000" --config "segment.ms=60000"

2) 制作人

./kafka-avro-console-producer --broker-list broker:29092 --topic test-- 
  property parse.key=true --property key.schema='{"type":"long"}' --property 
  "key.separator=:" --property value.schema='{"type": "record","name": 
  "ppp","namespace": "test.topic","fields": [{"name": "id","type": "long"}]}'

3) 来自生产者的消息

1:{"id": 1}
 2:{"id": 2}
 4:{"id": 4}
 5:{"id": 5}

4) 消费者

./kafka-avro-console-consumer \
    --bootstrap-server broker:29092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic test--from-beginning --property print.key=true

消费者显示四条信息。
但我预计,如果我在一分钟后再次运行consumer(我也等了更多时间,甚至几个小时),消息就不会显示了´无法显示,因为retention.ms和segment.ms是一分钟。
消息何时被真正删除?

piok6c0g

piok6c0g1#

更改 retention.ms 如ajay srivastava所述 kafka-topics --zookeeper localhost:2181 --alter --topic test --config retention.ms=60000 再次测试。

ou6hu8tu

ou6hu8tu2#

Kafka在删除过程中要知道的另一个重要思想是 log segment file :
主题被划分为分区,对吗?这就是允许并行性、规模等的原因。。
每个分区分为 log segments files . 为什么?因为Kafka把数据写入磁盘对吗。。。?我们不想把整个 topic / partition 在一个巨大的文件,但分裂成更小的文件(段)。。
将数据拆分成更小的文件有很多好处,但与问题无关。你可以在这里读更多
这里要注意的关键是:
保留策略正在查看日志semgnet的文件时间戳。
“按时间保留是通过检查磁盘上每个日志段文件的上次修改时间(mtime)来执行的。在正常的clus-ter操作中,这是日志段关闭的时间,表示文件中最后一条消息的时间戳
(摘自《Kafka权威指南》,第26页)

版本0.10.1.0

日志保留时间不再基于日志段的上次修改时间。相反,它将基于日志段中消息的最大时间戳。
这意味着它只在关闭的日志段文件上查找。确保“段”配置参数正确。。

相关问题