有没有办法在每次运行前删除某个主题中的所有数据或删除该主题?

8mmmxcuj  于 2021-06-07  发布在  Kafka
关注(0)|答案(13)|浏览(323)

有没有办法删除某个主题中的所有数据或在每次运行前删除该主题?
我可以修改kafkanconfig.scala文件来更改 logRetentionHours 财产?有没有一种方法可以让消费者一看到这些信息就立即将其删除?
我正在使用生产者从某处获取数据,并将数据发送到消费者使用的特定主题,我可以在每次运行时删除该主题中的所有数据吗?每次在主题中我只需要新数据。有没有办法重新初始化这个主题?

fnvucqvd

fnvucqvd1#

从Kafka2.3.0版本开始,有一种软删除Kafka的替代方法(不推荐使用旧方法)。
将retention.ms更新为1秒(1000ms),然后在一分钟后再次将其设置为默认设置,即7天(168小时,604800000毫秒)
柔软的deletion:- (rentation.ms=1000)(使用kafka configs.sh)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

设置为default:- 7 天(168小时,保留时间ms=604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000
ffdz8vbo

ffdz8vbo2#

Kafka0.10测试

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

注意:如果您正在删除kafka日志中的主题文件夹,但不是从zookeeper数据文件夹中删除,那么您将看到主题仍然存在。

iswrvxsc

iswrvxsc3#

停止Zookeeper和Kafka
在server.properties中,更改log.retention.hours值。你可以评论 log.retention.hours 并添加 log.retention.ms=1000 . 它将保持Kafka主题的记录只有一秒钟。
启动zookeeper和kafka。
检查用户控制台。当我第一次打开控制台时,记录就在那里。但当我再次打开控制台时,记录被删除了。
稍后,您可以设置 log.retention.hours 你想要的身材。

7eumitmz

7eumitmz4#

我们尝试了其他答案所描述的方法,取得了中等程度的成功。真正对我们有用的是class命令(apachekafka0.8.1)
sh kafka-run-class.sh kafka.admin.deletetopiccommand--topic yourtopic--zookeeperlocalhost:2181

cngwdvgl

cngwdvgl5#

在从kafka集群中手动删除主题时,您只需检查一下https://github.com/darrenfu/bigdata/issues/6 在大多数解决方案中,一个关键的步骤遗漏了很多,那就是删除 /config/topics/<topic_name> 在zk。

4nkexdtk

4nkexdtk6#

我想还没有人支持。看看jira的这个问题“add-delete-topic-support”。
要手动删除:
关闭群集
清除kafka日志目录(由 log.dir 属性)以及zookeeper数据
重新启动群集
对于任何给定的主题,你能做的就是
阻止Kafka
清除特定于分区的kafka日志,kafka以“logdir/topic partition”格式存储其日志文件,因此对于名为“mytopic”的主题,分区id 0的日志将存储在 /tmp/kafka-logs/MyTopic-0 哪里 /tmp/kafka-logs 是由 log.dir 属性
重启Kafka
这是 NOT 这是一个很好的推荐方法,但应该有效。在kafka代理配置文件中 log.retention.hours.per.topic 属性用于定义 The number of hours to keep a log file before deleting it for some specific topic 另外,有没有一种方法可以让消费者一看到这些信息就立即将其删除?
Kafka文献:
kafka集群保留所有已发布的消息,无论它们是否已在可配置的时间段内被使用。例如,如果日志保留时间设置为两天,则在消息发布后的两天内,它可供使用,之后将被丢弃以释放空间。kafka的性能在数据大小方面实际上是恒定的,因此保留大量数据不是问题。
事实上,在每个使用者的基础上保留的唯一元数据是使用者在日志中的位置,称为“偏移量”。此偏移量由使用者控制:通常,使用者在读取消息时会线性地提前偏移量,但实际上位置由使用者控制,它可以按自己喜欢的任何顺序使用消息。例如,使用者可以重置为旧的偏移量以重新处理。
对于在Kafka0.8中寻找开始偏移量的简单消费者示例,他们说
Kafka包括两个常数来帮助, kafka.api.OffsetRequest.EarliestTime() 在日志中找到数据的开头并从那里开始流式传输, kafka.api.OffsetRequest.LatestTime() 将仅流式传输新消息。
您还可以在那里找到管理消费端偏移的示例代码。

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
zpf6vheq

zpf6vheq7#

作为一个棘手的解决方法,您可以调整每个主题的运行时保留设置,例如。 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1 (retention.bytes=0也可以工作)
过了一会儿,Kafka应该腾出空间。与重新创建主题相比,不确定这是否有任何意义。
另外,最好把保留设置带回来,一旦Kafka完成清洁。
你也可以使用 retention.ms 保存历史数据

1wnzp6jl

1wnzp6jl8#

正如我在这里提到的,清除Kafka队列:
在kafka 0.8.2中测试,对于快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:

delete.topic.enable=true

然后,可以运行以下命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
gab6jxml

gab6jxml9#

以下是清空和删除kafka主题的脚本,假设localhost作为zookeeper服务器,kafka\u home设置为安装目录:
下面的脚本将清空主题,方法是将其保留时间设置为1秒,然后删除配置:


# !/bin/bash

echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

要完全删除主题,必须停止任何适用的kafka代理并从kafka日志目录(默认值:/tmp/kafka logs)中删除其目录,然后运行此脚本从zookeeper中删除主题。要验证是否已从zookeeper中删除,ls/brokers/topics的输出不应再包含以下主题:


# !/bin/bash

echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF
jyztefdp

jyztefdp10#

我在集成测试运行后使用下面的实用程序进行清理。
它使用最新的 AdminZkClient 应用程序编程接口。旧的api已被弃用。

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }

}

有一个选项“删除主题”。但是,它标记了要删除的主题。zookeeper稍后会删除该主题。因为这可能是不可预测的长,我更喜欢retention.ms方法

ecfsfe2w

ecfsfe2w11#

适用于brew用户

如果你用的是 brew 像我一样,浪费了很多时间寻找臭名昭著的 kafka-logs 文件夹,不要再害怕了(请让我知道这是否适用于您和多个不同版本的自制、Kafka等:))
你可能会在下面找到它:

地点: /usr/local/var/lib/kafka-logs ####如何真正找到那条路

(这基本上对通过brew安装的每个应用程序都很有帮助)

  1. brew services list kafka启动matbhz/users/matbhz/library/launchagents/homebrew.mxcl.kafka.plist
    2) 打开看看 plist 你在上面找到的
    3) 找到定义 server.properties 位置打开它,在我的情况下: /usr/local/etc/kafka/server.properties 4) 找那个 log.dirs 生产线:
    log.dirs=/usr/local/var/lib/kafka日志
    5) 转到该位置并删除所需主题的日志
    6) 重新开始Kafka brew services restart kafka
inn6fuwd

inn6fuwd12#

我使用这个脚本:


# !/bin/bash

topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done
s71maibg

s71maibg13#

关于主题及其分区的所有数据都存储在 tmp/kafka-logs/ . 此外,它们以某种格式存储 topic-partionNumber ,因此如果要删除主题 newTopic ,您可以:
阻止Kafka
删除文件 rm -rf /tmp/kafka-logs/newTopic-*

相关问题