我是Kafka流的新手。我需要从配置文件中动态创建Kafka流,其中包含源和目标主题名称。是否可以重新启动和停止Kafka流?我的目标是使用Kafka流定期将消息从一个主题传输到另一个主题。我使用了spring cron作业并尝试关闭和打开流,但当我关闭流时无法再次启动它。我收到以下错误--〉客户端已经启动或停止,无法重新启动。我正在用java编写代码
+--------------+
+<----- | Created (0) |
| +-----+--------+
| |
| v
| +----+--+------+
| | Re- |
+<----- | Balancing (1)| -------->+
| +-----+-+------+ |
| | ^ |
| v | |
| +--------------+ v
| | Running (2) | -------->+
| +------+-------+ |
| | |
| v |
| +------+-------+ +----+-------+
+-----> | Pending |<--- | Error (5) |
| Shutdown (3) | +------------+
+------+-------+
|
v
+------+-------+
| Not |
| Running (4) |
+--------------+
2条答案
按热度按时间ktca8awb1#
KafkaStreams
类上有两个方法pause
和resume
,可用于暂停和恢复处理。https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#pause--https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#resume--
您可以使用
java.util.concurrent.ScheduledExecutorService
的scheduleAtFixedRate
方法来排程每5分钟暂停和继续一次。lf3rwulv2#
如果您使用Spring Cloud Stream -您可以连接到BindingsLifecycleController
请参阅https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html
另一个选择是通过Actuator端点公开控件并调用它(更多信息也在同一链接中)