在wordcount应用程序的上下文中,kafka ktable中的单词计数从一些随机值开始

flmtquvp  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(143)

在kafka streams wordcount应用程序中,我按键对传入记录进行分组,并将计数存储在具体化的ktable中:

public void process(@Input(Binding.INPUT) KStream<String, String> stream) {
   stream
            .peek((k, v) -> {
                System.out.println("INCOMING :: KEY = " + k + " , VALUE = " + v);
            })
            .map((k, v) -> new KeyValue<>(v, v))
            .groupByKey()
            .count(Materialized.as(Binding.STORENAME));
}

我正在使用interactivequeryservice从kafka商店阅读:

@Component
public class CountQueryProcessor {

    @Autowired
    private InteractiveQueryService interactiveQueryService;

    private ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore;

    @Scheduled(initialDelay = 0, fixedDelay = 5000)
    public void queryStore() {
        readOnlyKeyValueStore = interactiveQueryService.getQueryableStore(Binding.STORENAME, QueryableStoreTypes.keyValueStore());

        KeyValueIterator<String, Long> iterator = readOnlyKeyValueStore.all();
        while(iterator.hasNext())   {
            KeyValue<String, Long> next = iterator.next();
            System.out.println("Query :: Word = " + next.key + " , Count = " + next.value);
        }
    }
}

打印ktable存储区中的值时,其值从随机值开始并开始递增。我不明白为什么会这样。
任何解决这个问题的方法和幕后的解释都会很有帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题