我要用staterestorelistener和SpringCloudKafka流绑定器。我需要监视应用程序的容错状态存储的恢复进度。合流中有一个例子https://docs.confluent.io/current/streams/monitoring.html#streams-监视运行时状态。
为了观察所有状态存储的恢复情况,您向应用程序提供org.apache.kafka.streams.processor.staterestorelistener接口的示例。通过调用kafkastreams#setglobalstaterestorelistener方法来设置org.apache.kafka.streams.processor.staterestorelistener。
第一个问题是从应用程序中获取Kafka流。我通过使用
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
第二个问题是将staterestorelistener设置为kafkastreams,因为我得到了一个错误
java.lang.illegalstateexception:只能将globalstaterestorelistener设置为created状态。当前状态为:正在运行
是否可以在SpringCloudKafka流绑定器中使用staterestorelistener?谢谢
1条答案
按热度按时间qfe3c7zg1#
你可以通过使用
StreamsBuilderFactoryBeanCustomizer
这样你就可以访问底层KafkaStreams
对象。如果您使用的是3.0或更高版本的活页夹,这是推荐的方法。例如,您可以提供以下内容bean
并使用GlobalStateRestoreListener
.这个博客有更多关于这个策略的细节。