我正在创建一个kafka流来将信息从一个应用程序复制到另一个应用程序,目标api有一些维护窗口,当我不需要发送数据或者我可能在它上引起问题时。我有一个api,当有一个维护周期时它会给我这个不是问题,我想知道的是如何在给定的时间段内禁用流,并在维护窗口结束后再次启动它。我在用java写代码
balp4ylt1#
您可以按照用例所需的方式来管理kafka流状态,比如启动/停止。为此,您需要在内存中收集kafka流,并在维护期间使用 kafkaStreams.close() 以及 kafkaStreams.cleanUp() 在每个需要的流上。维护完成后,使用 kafkaStreams.start() .监听维护可以通过多种方式进行,例如:通过调度(例如使用 quartz 图书馆)。如果您有多个应用程序示例,则应在每个节点上触发调度程序。Kafka的主题 maintenance_operations (例如,带有状态的消息) MAINTENANCE_STARTED 或者 MAINTENANCE_COMPLETED ). 您的应用程序将始终侦听此主题,并根据事件启动/停止所需的流。如果你有多个应用示例,每个节点都应该有一个唯一的用户组 maintenance_operations 主题。
kafkaStreams.close()
kafkaStreams.cleanUp()
kafkaStreams.start()
quartz
maintenance_operations
MAINTENANCE_STARTED
MAINTENANCE_COMPLETED
1条答案
按热度按时间balp4ylt1#
您可以按照用例所需的方式来管理kafka流状态,比如启动/停止。为此,您需要在内存中收集kafka流,并在维护期间使用
kafkaStreams.close()
以及kafkaStreams.cleanUp()
在每个需要的流上。维护完成后,使用kafkaStreams.start()
.监听维护可以通过多种方式进行,例如:
通过调度(例如使用
quartz
图书馆)。如果您有多个应用程序示例,则应在每个节点上触发调度程序。Kafka的主题
maintenance_operations
(例如,带有状态的消息)MAINTENANCE_STARTED
或者MAINTENANCE_COMPLETED
). 您的应用程序将始终侦听此主题,并根据事件启动/停止所需的流。如果你有多个应用示例,每个节点都应该有一个唯一的用户组maintenance_operations
主题。