如何在ktable中删除密钥

jmo0nnb3  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(306)

有哪些方法可以在带窗口的k表中重复删除密钥?
添加了基于其他线程中提供的解决方案的转换,但仍然看到相同的计数。有人能看出这有什么问题吗?

final KTable<Windowed<String>, Long> aggregated = feeds
            .selectKey((k, v) -> v.getUserId().toString())
            .transform(() -> new Transformer<String, AvroMessage, KeyValue<String, AvroMessage>>() {
                private ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, AvroMessage> transform(String key, AvroMessage value) {

                    Collection<String> list = Arrays.asList(key);

                    // Get collection without duplicate i.e. distinct only
                    List<String> distinctElements = list.stream().distinct().collect(Collectors.toList());

                    key = distinctElements.toString();
                    // transform value using timestamp
                    return new KeyValue<>(key, value);
                }

                @Override
                public KeyValue<String, AvroMessage> punctuate(long timestamp) {
                    return null;
                }

                @Override
                public void close() {
                }
            })
            // no need to specify explicit serdes because the resulting key and value types match our default serde settings
            .groupByKey()
            .count(TimeWindows.of(windowSizeMs),STATE_STORE);

输出:

[KSTREAM-AGGREGATE-0000000002]: [227338224@1517605200000] , (2<-null)

预期

[KSTREAM-AGGREGATE-0000000002]: [227338224@1517605200000] , (1<-null)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题