如何删除消费者已经消费的数据?Kafka

yptwkmov  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(1024)

我在Kafka做数据复制。但是,kafka日志文件的大小增长很快。一天的大小达到5GB。为了解决这个问题,ı 要立即删除已处理的数据。我在adminclient中使用delete record方法来删除偏移量。但是当我查看日志文件时,与该偏移量对应的数据并没有被删除。

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

我不需要这样的建议(log.retention.hours,log.retention.bytes,log.segment.bytes,log.cleanup.policy=delete)
因为我只想删除消费者消费的数据。在这个解决方案中,我还删除了未使用的数据。
你有什么建议?

js4nwp54

js4nwp541#

你没做错什么。你提供的代码工作正常,我已经测试过了。万一我忽略了你代码中的某些内容,我的代码是:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

我使用了group:'org.apache.kafka',name:'kafka clients',version:'2.0.0'
因此,请检查是否以正确的分区为目标(第一个分区为0)
检查您的代理版本:https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/adminclient.html 说:
版本为0.11.0.0的代理支持此操作
从同一个应用程序生成消息,以确保连接正确。
还有一个你可以考虑的选择。使用cleanup.policy=compact如果您的消息键是重复的,您可以从中受益。不仅因为该键的旧消息将被自动删除,而且您可以使用这样一个事实,即负载为null的消息将删除该键的所有消息。只是别忘了将delete.retention.ms和min.compression.lag.ms的值设置得足够小。在这种情况下,您可以使用一条消息,然后为同一个键生成空负载(但是要谨慎使用这种方法,因为这样可以删除(使用该键)您没有使用的消息)

qxgroojn

qxgroojn2#

试试这个

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

在这段代码中,您需要调用 entry.getValue().get().lowWatermark() ,因为adminclient.deleterecords(recordstodelete)返回未来的Map,所以需要通过调用get()等待未来运行

相关问题