java 如何删除AWS MSK集群中的Kafka状态存储

xwmevbvl  于 2023-11-15  发布在  Java
关注(0)|答案(1)|浏览(115)

我有一个kafkaStreams应用程序在AWS上使用MSK集群。我需要清理状态存储(在我的应用程序中使用一些KTables后创建的)。我找不到任何方法来访问MSK集群的文件系统。
我在这里发现我可以用途:

KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions.  See tip on `cleanUp()` below.
app.cleanUp();
app.start();

字符串
但在我的例子中,我使用的是Spring Kafka,我的代码中没有KafkaStreams示例,应用程序会自动启动。
我还发现,只要删除主题(我的状态存储的输入),状态存储将被删除,不知道这里需要多少时间,我试图删除主题,15分钟后状态存储看起来仍然存在,所以我只是重新创建了主题。
我还发现了这个关于获取状态存储目录路径并使用应用程序代码删除它的建议,我确信它不会工作,因为目录同时被应用程序本身使用,所以它不能被删除,也不确定应用程序可以删除集群中的任何东西:

String stateDirectory = config.getString(StreamsConfig.STATE_DIR_CONFIG);
// Delete the state directory using appropriate file operations


我认为唯一的解决方案是:创建一个标点符号或处理器或类似的东西,获取状态存储名称,将其传递给处理器并清理那里的状态存储,这似乎是一个好的解决方案吗?
先谢谢你了。

bzzcjhmw

bzzcjhmw1#

访问MSK群集文件系统的任何方法
你能SSH到代理EC2示例吗?这是唯一可能的方法。
不确定应用程序是否可以删除群集中的任何内容
正确。状态存储存储在应用运行的位置。Kafka集群仅存储KTable的压缩内部主题,而不是任何RocksDB示例元数据。
您可以使用kafka-streams-application-reset.sh删除集群上的数据。
我正在使用Spring Kafka,但我的代码中没有KafkaStreams示例
你做/应该做。它只是抽象出来的。https://docs.spring.io/spring-kafka/docs/current/reference/html/#streams-kafka-streams
如果你只有@KafkaListener消费者,那么这不是使用Kafka流。

相关问题