apache flink:如何处理三个流

oo7oh9g9  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(333)

我想在一个操作符中接收和处理三个流。例如,在storm中实现的代码如下: builder.setBolt("C_bolt", C_bolt(), parallelism_hint) .fieldsGrouping("A_bolt", "TRAINING", new Fields("word")) .fieldsGrouping("B_bolt", "ANALYSIS", new Fields("word")) .allGrouping("A_bolt", "SUM"); 在Flink,处理 SUM stream(A_bolt's SideOutput) 以及 TRAINING stream(A_bolt) 已实施:

SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
                        .keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
                                    @Override
                                    public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
                                        return in.f0;
                                    }
                                })
                        .connect(Sum)
                        .flatMap(new Process())
                        .setParallelism(parallelism);

但我不知道怎么补充 ANALYSIS stream(B_bolt) . 谢谢你的帮助。

7gcisfzg

7gcisfzg1#

flink只支持一个输入和两个输入流操作符。您可以选择:
使用union()创建一个合并流,其中包含所有三个流中的所有元素(这三个流必须是同一类型的,不过您可以使用其中一个来帮助实现这一点)。
在使用coflatmap组合两个流之后,将该初步结果连接到第三个流,使用另一个coflatmap(或协处理函数)来完成处理。
或者这两种技术的结合在你的案例中更可取。

相关问题