streams.setStateListener((KafkaStreams.State newState, KafkaStreams.State old)->{
logger.info()
.message("Thread state has changed from {" + old.name() + "} to {" + newState.name() + "}")
.log();
// here you can either register a metric or a global state variable that can be used in a health probe api.
});
2条答案
按热度按时间r7xajy2e1#
为此,您可以注册一个状态侦听器,侦听流线程发生的每个状态更改(已创建、重新平衡、pending_shutdown、正在运行......)。
下面是如何注册stateListener的示例代码
5sxhfpxr2#
您可以使用KafkaStreams.setGlobalStateRestoreListner来设置
StateRestoreListener
,该StateRestoreListener
在使用onRestoreEnd
方法完成还原时提供通知。让我知道你的情况如何。
比尔·HTH