在kafka streams wordcount应用程序中,我按键对传入记录进行分组,并将计数存储在具体化的ktable中:
public void process(@Input(Binding.INPUT) KStream<String, String> stream) {
stream
.peek((k, v) -> {
System.out.println("INCOMING :: KEY = " + k + " , VALUE = " + v);
})
.map((k, v) -> new KeyValue<>(v, v))
.groupByKey()
.count(Materialized.as(Binding.STORENAME));
}
我正在使用interactivequeryservice从kafka商店阅读:
@Component
public class CountQueryProcessor {
@Autowired
private InteractiveQueryService interactiveQueryService;
private ReadOnlyKeyValueStore<String, Long> readOnlyKeyValueStore;
@Scheduled(initialDelay = 0, fixedDelay = 5000)
public void queryStore() {
readOnlyKeyValueStore = interactiveQueryService.getQueryableStore(Binding.STORENAME, QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, Long> iterator = readOnlyKeyValueStore.all();
while(iterator.hasNext()) {
KeyValue<String, Long> next = iterator.next();
System.out.println("Query :: Word = " + next.key + " , Count = " + next.value);
}
}
}
打印ktable存储区中的值时,其值从随机值开始并开始递增。我不明白为什么会这样。
任何解决这个问题的方法和幕后的解释都会很有帮助。
暂无答案!
目前还没有任何答案,快来回答吧!