在给定的时间段内停止kafka流

bhmjp9jg  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(360)

我正在创建一个kafka流来将信息从一个应用程序复制到另一个应用程序,目标api有一些维护窗口,当我不需要发送数据或者我可能在它上引起问题时。
我有一个api,当有一个维护周期时它会给我这个不是问题,我想知道的是如何在给定的时间段内禁用流,并在维护窗口结束后再次启动它。
我在用java写代码

balp4ylt

balp4ylt1#

您可以按照用例所需的方式来管理kafka流状态,比如启动/停止。为此,您需要在内存中收集kafka流,并在维护期间使用 kafkaStreams.close() 以及 kafkaStreams.cleanUp() 在每个需要的流上。维护完成后,使用 kafkaStreams.start() .
监听维护可以通过多种方式进行,例如:
通过调度(例如使用 quartz 图书馆)。如果您有多个应用程序示例,则应在每个节点上触发调度程序。
Kafka的主题 maintenance_operations (例如,带有状态的消息) MAINTENANCE_STARTED 或者 MAINTENANCE_COMPLETED ). 您的应用程序将始终侦听此主题,并根据事件启动/停止所需的流。如果你有多个应用示例,每个节点都应该有一个唯一的用户组 maintenance_operations 主题。

相关问题