有没有办法删除某个主题中的所有数据或在每次运行前删除该主题?
我可以修改kafkanconfig.scala文件来更改 logRetentionHours
财产?有没有一种方法可以让消费者一看到这些信息就立即将其删除?
我正在使用生产者从某处获取数据,并将数据发送到消费者使用的特定主题,我可以在每次运行时删除该主题中的所有数据吗?每次在主题中我只需要新数据。有没有办法重新初始化这个主题?
有没有办法删除某个主题中的所有数据或在每次运行前删除该主题?
我可以修改kafkanconfig.scala文件来更改 logRetentionHours
财产?有没有一种方法可以让消费者一看到这些信息就立即将其删除?
我正在使用生产者从某处获取数据,并将数据发送到消费者使用的特定主题,我可以在每次运行时删除该主题中的所有数据吗?每次在主题中我只需要新数据。有没有办法重新初始化这个主题?
13条答案
按热度按时间fnvucqvd1#
从Kafka2.3.0版本开始,有一种软删除Kafka的替代方法(不推荐使用旧方法)。
将retention.ms更新为1秒(1000ms),然后在一分钟后再次将其设置为默认设置,即7天(168小时,604800000毫秒)
柔软的deletion:- (rentation.ms=1000)(使用kafka configs.sh)
设置为default:- 7 天(168小时,保留时间ms=604800000)
ffdz8vbo2#
Kafka0.10测试
注意:如果您正在删除kafka日志中的主题文件夹,但不是从zookeeper数据文件夹中删除,那么您将看到主题仍然存在。
iswrvxsc3#
停止Zookeeper和Kafka
在server.properties中,更改log.retention.hours值。你可以评论
log.retention.hours
并添加log.retention.ms=1000
. 它将保持Kafka主题的记录只有一秒钟。启动zookeeper和kafka。
检查用户控制台。当我第一次打开控制台时,记录就在那里。但当我再次打开控制台时,记录被删除了。
稍后,您可以设置
log.retention.hours
你想要的身材。7eumitmz4#
我们尝试了其他答案所描述的方法,取得了中等程度的成功。真正对我们有用的是class命令(apachekafka0.8.1)
sh kafka-run-class.sh kafka.admin.deletetopiccommand--topic yourtopic--zookeeperlocalhost:2181
cngwdvgl5#
在从kafka集群中手动删除主题时,您只需检查一下https://github.com/darrenfu/bigdata/issues/6 在大多数解决方案中,一个关键的步骤遗漏了很多,那就是删除
/config/topics/<topic_name>
在zk。4nkexdtk6#
我想还没有人支持。看看jira的这个问题“add-delete-topic-support”。
要手动删除:
关闭群集
清除kafka日志目录(由
log.dir
属性)以及zookeeper数据重新启动群集
对于任何给定的主题,你能做的就是
阻止Kafka
清除特定于分区的kafka日志,kafka以“logdir/topic partition”格式存储其日志文件,因此对于名为“mytopic”的主题,分区id 0的日志将存储在
/tmp/kafka-logs/MyTopic-0
哪里/tmp/kafka-logs
是由log.dir
属性重启Kafka
这是
NOT
这是一个很好的推荐方法,但应该有效。在kafka代理配置文件中log.retention.hours.per.topic
属性用于定义The number of hours to keep a log file before deleting it for some specific topic
另外,有没有一种方法可以让消费者一看到这些信息就立即将其删除?Kafka文献:
kafka集群保留所有已发布的消息,无论它们是否已在可配置的时间段内被使用。例如,如果日志保留时间设置为两天,则在消息发布后的两天内,它可供使用,之后将被丢弃以释放空间。kafka的性能在数据大小方面实际上是恒定的,因此保留大量数据不是问题。
事实上,在每个使用者的基础上保留的唯一元数据是使用者在日志中的位置,称为“偏移量”。此偏移量由使用者控制:通常,使用者在读取消息时会线性地提前偏移量,但实际上位置由使用者控制,它可以按自己喜欢的任何顺序使用消息。例如,使用者可以重置为旧的偏移量以重新处理。
对于在Kafka0.8中寻找开始偏移量的简单消费者示例,他们说
Kafka包括两个常数来帮助,
kafka.api.OffsetRequest.EarliestTime()
在日志中找到数据的开头并从那里开始流式传输,kafka.api.OffsetRequest.LatestTime()
将仅流式传输新消息。您还可以在那里找到管理消费端偏移的示例代码。
zpf6vheq7#
作为一个棘手的解决方法,您可以调整每个主题的运行时保留设置,例如。
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
(retention.bytes=0也可以工作)过了一会儿,Kafka应该腾出空间。与重新创建主题相比,不确定这是否有任何意义。
另外,最好把保留设置带回来,一旦Kafka完成清洁。
你也可以使用
retention.ms
保存历史数据1wnzp6jl8#
正如我在这里提到的,清除Kafka队列:
在kafka 0.8.2中测试,对于快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:
然后,可以运行以下命令:
gab6jxml9#
以下是清空和删除kafka主题的脚本,假设localhost作为zookeeper服务器,kafka\u home设置为安装目录:
下面的脚本将清空主题,方法是将其保留时间设置为1秒,然后删除配置:
要完全删除主题,必须停止任何适用的kafka代理并从kafka日志目录(默认值:/tmp/kafka logs)中删除其目录,然后运行此脚本从zookeeper中删除主题。要验证是否已从zookeeper中删除,ls/brokers/topics的输出不应再包含以下主题:
jyztefdp10#
我在集成测试运行后使用下面的实用程序进行清理。
它使用最新的
AdminZkClient
应用程序编程接口。旧的api已被弃用。有一个选项“删除主题”。但是,它标记了要删除的主题。zookeeper稍后会删除该主题。因为这可能是不可预测的长,我更喜欢retention.ms方法
ecfsfe2w11#
适用于brew用户
如果你用的是
brew
像我一样,浪费了很多时间寻找臭名昭著的kafka-logs
文件夹,不要再害怕了(请让我知道这是否适用于您和多个不同版本的自制、Kafka等:))你可能会在下面找到它:
地点:
/usr/local/var/lib/kafka-logs
####如何真正找到那条路(这基本上对通过brew安装的每个应用程序都很有帮助)
brew services list
kafka启动matbhz/users/matbhz/library/launchagents/homebrew.mxcl.kafka.plist2) 打开看看
plist
你在上面找到的3) 找到定义
server.properties
位置打开它,在我的情况下:/usr/local/etc/kafka/server.properties
4) 找那个log.dirs
生产线:log.dirs=/usr/local/var/lib/kafka日志
5) 转到该位置并删除所需主题的日志
6) 重新开始Kafka
brew services restart kafka
inn6fuwd12#
我使用这个脚本:
s71maibg13#
关于主题及其分区的所有数据都存储在
tmp/kafka-logs/
. 此外,它们以某种格式存储topic-partionNumber
,因此如果要删除主题newTopic
,您可以:阻止Kafka
删除文件
rm -rf /tmp/kafka-logs/newTopic-*