我正在学习Apache Flink,下面提到的用例似乎很简单,Flink不支持它?让我开始觉得我理解错了
用例是来自flink docs的这个简单示例。
假设我们有这些静态元素,我想过滤掉。我们也有不断流动的动态单词。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
文件中指出:
重要的是要认识到,您无法控制flatMap 1和flatMap 2回调的调用顺序。......在时间和/或顺序很重要的情况下,您可能会发现有必要在托管Flink状态下缓冲事件,直到您的应用程序准备好处理它们。
如果我们不知道这些词的消费顺序,我们如何真实的过滤这些词?
我是否误解了如何以及为什么必须从根本上使用Apache Flink?
我是否应该以非flink API的方式存储我的控制字,例如作为一个局部变量,并且只使用flink API处理流字。或者说,对于这种情况,什么是最合适的方法呢?
1条答案
按热度按时间qnyhuwrf1#
在真正的流环境中,没有完美的解决方案来连接两个流,因为(如文档中所述)
您无法控制flatMap 1和flatMap 2回调的调用顺序
因此,在完整性/准确性和延迟之间总是存在权衡,作为应用程序开发人员,您需要决定。
标准的解决方案是使用Flink状态(例如
ListState
)来缓冲与过滤器状态不匹配的传入元素(在您的示例中为streamOfWords
),以及一个计时器,当您等待“足够长的时间”时,它将触发。当计时器触发时,将发出所有缓冲的元素。请注意,您还可以使用支持临时连接的Table API来执行上述操作。