kafka

4ngedf3f  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(280)

我正在用SpringKafka编写一个SpringBoot应用程序。由于我的应用程序是集中在Kafka流,我需要使用交互式查询和查询我的国有商店我想知道:有没有什么特别的方式访问Kafka流国有商店与SpringKafka?
据我所见,springcloudstreambinderkafka流对交互式查询提供了一些支持,但我在springkafka中找不到任何关于它们的信息。在《Spring的Kafka》里,我是错过了什么还是没有人支持?
如果是这样的话,在创建我自己版本的 org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService ?
当我只使用kafka和kafka流时,包含spring云流似乎有点太多了,但是如果提供了交互式查询支持,那么就很难单独实现,可能还是建议包含它?如有任何建议,我将不胜感激。

tjrkku2a

tjrkku2a1#

如果您使用spring-kafka和kafka-streams,我建议您将kafkastreams定义为@bean,这样spring将负责对象的生命周期,比如

@Configuration
public class KafkaConfig{
  @Bean
  KafkaStreams ingestAndAggregateStream(KafkaProperties kafkaProps){
  Topology t....
  KafkaStreams stream = new KafkaStreams(t,kafkaProps.buildStreamsProperties())
  stream.start();
  return stream;
  }
}

一旦您这样做了,您就可以在控制器中自动连接流,并使用它来检索和查询本地kvstore

@RestController
public class StreamingController {

  private final KafkaStreams ingestAndAggregateStream;
  public StreamingController(KafkaStreams ingestAndAggregateStream){
    this.ingestAndAggregateStream = ingestAndAggregateStream;
  }

  @RequestMapping(value = "/queryStore", method = RequestMethod.GET)
  @ResponseBody
  public Value getKVStoreValue(@RequestParam String key){
    ReadOnlyKeyValueStore<String, Value> store = ingestAndAggregateStream
        .store("KV_STORE_NAME",
            QueryableStoreTypes.<String, Value>keyValueStore());
    return store.get(key);
  }
}

相关问题