有哪些方法可以在带窗口的k表中重复删除密钥?
添加了基于其他线程中提供的解决方案的转换,但仍然看到相同的计数。有人能看出这有什么问题吗?
final KTable<Windowed<String>, Long> aggregated = feeds
.selectKey((k, v) -> v.getUserId().toString())
.transform(() -> new Transformer<String, AvroMessage, KeyValue<String, AvroMessage>>() {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, AvroMessage> transform(String key, AvroMessage value) {
Collection<String> list = Arrays.asList(key);
// Get collection without duplicate i.e. distinct only
List<String> distinctElements = list.stream().distinct().collect(Collectors.toList());
key = distinctElements.toString();
// transform value using timestamp
return new KeyValue<>(key, value);
}
@Override
public KeyValue<String, AvroMessage> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
})
// no need to specify explicit serdes because the resulting key and value types match our default serde settings
.groupByKey()
.count(TimeWindows.of(windowSizeMs),STATE_STORE);
输出:
[KSTREAM-AGGREGATE-0000000002]: [227338224@1517605200000] , (2<-null)
预期
[KSTREAM-AGGREGATE-0000000002]: [227338224@1517605200000] , (1<-null)
暂无答案!
目前还没有任何答案,快来回答吧!