如何使用查询api

z18hc3ub  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(370)

我对Kafka和Kafka流还不熟悉。我有一个工作的基本 Spring 服务与Kafka生产者,消费者,kstream和ktable。现在,我想检查我的ktable记录,所以为了实现它,我尝试使用kafka查询api。
这可以通过以下方式实现(不使用spring集成):

KafkaStreams streams = new KafkaStreams(topology, config);
// Get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custom-store", new MyCustomStoreType<String,String>());
// Query the store
String value = store.read("key");

现在,我尝试使用基于spring的interactivequeryservice来查询。。但我在春晚中遇到了一些相关的问题。
在spring中使用kafka查询api的最佳方法是什么?
我的服务中的spring kafka配置如下所示:

@Bean("streamsBuilder")
public StreamsBuilderFactoryBean recordsStreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    // set some properties
    return new StreamsBuilderFactoryBean(new KafkaStreamsConfiguration(config));
}

你能给个建议吗?

zbdgwd5y

zbdgwd5y1#

这里有一个spring引导应用程序来展示如何。。。

@SpringBootApplication
@EnableKafkaStreams
public class So58918956Application {

    public static void main(String[] args) {
        SpringApplication.run(So58918956Application.class, args);
    }

    @Bean
    public CountDownLatch latch(StreamsBuilderFactoryBean streamsBuilderFB) {
        CountDownLatch latch = new CountDownLatch(1);
        streamsBuilderFB.setStateListener((newState, oldState) -> {
            if (State.RUNNING.equals(newState)) {
                latch.countDown();
            }
        });
        return latch;
    }

    @Bean
    public KTable<String, String> table(StreamsBuilder streamsBuilder) {
        Serde<String> serde = Serdes.String();
        KTable<String, String> table = streamsBuilder.table("so58918956",
                Consumed.with(serde, serde)
                        .withOffsetResetPolicy(AutoOffsetReset.EARLIEST), 
                Materialized.as("the-custom-store"));
        return table;
    }

    @Bean
    public ApplicationRunner runner(StreamsBuilderFactoryBean streamsBuilderFB,
            KafkaTemplate<String, String> template, KTable<String, String> table) {

        return args -> {
            template.send("so58918956", "key", "value");
            latch(streamsBuilderFB).await(10, TimeUnit.SECONDS);
            ReadOnlyKeyValueStore<String, String> store = streamsBuilderFB.getKafkaStreams().store(
                    table.queryableStoreName(),
                    QueryableStoreTypes.keyValueStore());
            System.out.println(store.get("key"));
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58918956").partitions(1).replicas(1).build();
    }

}

相关问题