我使用的是flink1.3,我定义了两个流源,它们将发出相同的事件供后续操作符(我定义的process操作符和sink操作符)处理
但在源进程pink管道中,我只能指定一个源,我会问如何指定两个或多个源并执行相同的进程和接收器
object FlinkApplication {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.addSource(new MySource1()) //How to MySource2 here?
.setParallelism(1)
.name("source1")
.process(new MyProcess())
.setParallelism(4)
.addSink(new MySink())
.setParallelism(2)
env.execute("FlinkApplication")
}
}
1条答案
按热度按时间xriantvc1#
api在如何设置处理管道方面提供了很大的灵活性。如果要对多个源应用相同的逻辑,可以执行此操作:
或者,如果这样做更有意义,您可以合并两个流,然后处理合并的流(或这些方法的某种组合):
如果您希望分叉流并对每个副本应用不同的操作,也可以使用另一种方法:
哇,flink 1.3已经三岁多了!