如何过滤一个依赖于另一个的flink数据流?

cbeh67ev  于 2023-09-28  发布在  Apache
关注(0)|答案(1)|浏览(119)

我正在学习Apache Flink,下面提到的用例似乎很简单,Flink不支持它?让我开始觉得我理解错了
用例是来自flink docs的这个简单示例。
假设我们有这些静态元素,我想过滤掉。我们也有不断流动的动态单词。

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> control = env
    .fromElements("DROP", "IGNORE")
    .keyBy(x -> x);

DataStream<String> streamOfWords = env
    .fromElements("Apache", "DROP", "Flink", "IGNORE")
    .keyBy(x -> x);

control
    .connect(streamOfWords)
    .flatMap(new ControlFunction())
    .print();

env.execute();

}

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
  
@Override
public void open(Configuration config) {
    blocked = getRuntimeContext()
        .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
  
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
    blocked.update(Boolean.TRUE);
}
  
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
    if (blocked.value() == null) {
        out.collect(data_value);
    }
}

}
文件中指出:
重要的是要认识到,您无法控制flatMap 1和flatMap 2回调的调用顺序。......在时间和/或顺序很重要的情况下,您可能会发现有必要在托管Flink状态下缓冲事件,直到您的应用程序准备好处理它们。
如果我们不知道这些词的消费顺序,我们如何真实的过滤这些词?
我是否误解了如何以及为什么必须从根本上使用Apache Flink?
我是否应该以非flink API的方式存储我的控制字,例如作为一个局部变量,并且只使用flink API处理流字。或者说,对于这种情况,什么是最合适的方法呢?

qnyhuwrf

qnyhuwrf1#

在真正的流环境中,没有完美的解决方案来连接两个流,因为(如文档中所述)
您无法控制flatMap 1和flatMap 2回调的调用顺序
因此,在完整性/准确性和延迟之间总是存在权衡,作为应用程序开发人员,您需要决定。
标准的解决方案是使用Flink状态(例如ListState)来缓冲与过滤器状态不匹配的传入元素(在您的示例中为streamOfWords),以及一个计时器,当您等待“足够长的时间”时,它将触发。当计时器触发时,将发出所有缓冲的元素。
请注意,您还可以使用支持临时连接的Table API来执行上述操作。

相关问题