如何在flink应用程序中指定两个源,一个进程操作符和一个接收器操作符

l2osamch  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(337)

我使用的是flink1.3,我定义了两个流源,它们将发出相同的事件供后续操作符(我定义的process操作符和sink操作符)处理
但在源进程pink管道中,我只能指定一个源,我会问如何指定两个或多个源并执行相同的进程和接收器

object FlinkApplication {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.addSource(new MySource1()) //How to MySource2 here?
      .setParallelism(1)
      .name("source1")
      .process(new MyProcess())
      .setParallelism(4)
      .addSink(new MySink())
      .setParallelism(2)
    env.execute("FlinkApplication")
  }

}
xriantvc

xriantvc1#

api在如何设置处理管道方面提供了很大的灵活性。如果要对多个源应用相同的逻辑,可以执行此操作:

env.addSource(new MySource1())
  .process(new MyProcess())
  .addSink(new MySink())

env.addSource(new MySource2())
  .process(new MyProcess())
  .addSink(new MySink())

env.execute()

或者,如果这样做更有意义,您可以合并两个流,然后处理合并的流(或这些方法的某种组合):

stream1.union(stream2)
  .process(...)
  .addSink(...)

如果您希望分叉流并对每个副本应用不同的操作,也可以使用另一种方法:

val stream: DataStream[T] = env.addSource(new MySource())

stream.process(new MyProcess1())
  .addSink(new MySink1())

stream.process(new MyProcess2())
  .addSink(new MySink2())

env.execute()

哇,flink 1.3已经三岁多了!

相关问题