如何使用Kafka Streams和Spring Kafka Streams定期(每5分钟)暂停和恢复流处理?

nuypyhwy  于 2022-12-03  发布在  Apache
关注(0)|答案(2)|浏览(264)

我是Kafka流的新手。我需要从配置文件中动态创建Kafka流,其中包含源和目标主题名称。是否可以重新启动和停止Kafka流?我的目标是使用Kafka流定期将消息从一个主题传输到另一个主题。我使用了spring cron作业并尝试关闭和打开流,但当我关闭流时无法再次启动它。我收到以下错误--〉客户端已经启动或停止,无法重新启动。我正在用java编写代码

+--------------+
               +<----- | Created (0)  |
               |       +-----+--------+
               |             |
               |             v
               |       +----+--+------+
               |       | Re-          |
               +<----- | Balancing (1)| -------->+
               |       +-----+-+------+          |
               |             | ^                 |
               |             v |                 |
               |       +--------------+          v
               |       | Running (2)  | -------->+
               |       +------+-------+          |
               |              |                  |
               |              v                  |
               |       +------+-------+     +----+-------+
               +-----> | Pending      |<--- | Error (5)  |
                       | Shutdown (3) |     +------------+
                       +------+-------+
                              |
                              v
                       +------+-------+
                       | Not          |
                       | Running (4)  |
                       +--------------+
ktca8awb

ktca8awb1#

KafkaStreams类上有两个方法pauseresume,可用于暂停和恢复处理。
https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#pause--https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#resume--
您可以使用java.util.concurrent.ScheduledExecutorServicescheduleAtFixedRate方法来排程每5分钟暂停和继续一次。

lf3rwulv

lf3rwulv2#

如果您使用Spring Cloud Stream -您可以连接到BindingsLifecycleController
请参阅https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html
另一个选择是通过Actuator端点公开控件并调用它(更多信息也在同一链接中)

相关问题