使用staterestorelistener和spring cloud kafka streams绑定器

ht4b089n  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(430)

我要用staterestorelistener和SpringCloudKafka流绑定器。我需要监视应用程序的容错状态存储的恢复进度。合流中有一个例子https://docs.confluent.io/current/streams/monitoring.html#streams-监视运行时状态。
为了观察所有状态存储的恢复情况,您向应用程序提供org.apache.kafka.streams.processor.staterestorelistener接口的示例。通过调用kafkastreams#setglobalstaterestorelistener方法来设置org.apache.kafka.streams.processor.staterestorelistener。
第一个问题是从应用程序中获取Kafka流。我通过使用

  1. StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
  2. KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();

第二个问题是将staterestorelistener设置为kafkastreams,因为我得到了一个错误
java.lang.illegalstateexception:只能将globalstaterestorelistener设置为created状态。当前状态为:正在运行
是否可以在SpringCloudKafka流绑定器中使用staterestorelistener?谢谢

qfe3c7zg

qfe3c7zg1#

你可以通过使用 StreamsBuilderFactoryBeanCustomizer 这样你就可以访问底层 KafkaStreams 对象。如果您使用的是3.0或更高版本的活页夹,这是推荐的方法。例如,您可以提供以下内容 bean 并使用 GlobalStateRestoreListener .

  1. @Bean
  2. public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
  3. return factoryBean -> {
  4. factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
  5. @Override
  6. public void customize(KafkaStreams kafkaStreams) {
  7. kafkaStreams.setGlobalStateRestoreListener(...);
  8. }
  9. });
  10. };
  11. }

这个博客有更多关于这个策略的细节。

相关问题