:“distinct()”是否可以在flink的数据流中使用?

368yc8dk  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(423)

我想知道flink的datastreamapi是否可以用来从传入的记录(可能是在特定的时间窗口内)中删除重复项,就像datasetapi提供了一个称为“distinct”的转换一样。或者无论如何,如果数据集可以转换为datastream,假设数据集在flink中转换为datastream进行内部处理。
请帮帮我。提前谢谢!干杯!

rqdpfwrv

rqdpfwrv1#

我不知道任何内置原语,但是如果窗口中的所有数据都适合内存,那么您可以自己轻松地构建这个函数。

DataStream<...> stream = ...
stream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new DistinctFunction<>());

public class DistinctFunction<T, W extends Window> extends ProcessAllWindowFunction<T, T, W> implements Function {
    public void process(final Context context, Iterable<T> input, Collector<R> out) throws Exception {
        Set<T> elements = new HashSet<>();
        input.forEach(elements::add);
        elements.forEach(out::collect);
    }
}

当然,如果你有一个键的话,它的可伸缩性要大得多,因为只有窗口中一个键的数据需要保存在内存中。

相关问题