keyedprocessfunction排序

olhwl3o2  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(508)

我对Flink还不熟悉,我想知道Flink是怎么安排电话的 processElement() 在它的 KeyedProcessFunction 并行下的抽象。考虑以下生成部分和流的示例:

  1. package sample
  2. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  3. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  4. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
  5. import org.apache.flink.util.Collector
  6. object Playground {
  7. case class Record(groupId: String, score: Int) {}
  8. def main(args: Array[String]): Unit = {
  9. // 1. Create the environment
  10. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
  11. env.setParallelism(10)
  12. // 2. Source
  13. val record1 = Record("groupX", 1)
  14. val record2 = Record("groupX", 2)
  15. val record3 = Record("groupX", 3)
  16. val records: DataStream[Record] = env.fromElements(record1, record2, record3, record1, record2, record3)
  17. // 3. Application Logic
  18. val partialSums: DataStream[Int] = records
  19. .keyBy(record => record.groupId)
  20. .process(new KeyedProcessFunction[String, Record, Int] {
  21. // Store partial sum of score for Records seen
  22. lazy val partialSum: ValueState[Int] = getRuntimeContext.getState(
  23. new ValueStateDescriptor[Int]("partialSum", classOf[Int]))
  24. // Ingest new record
  25. override
  26. def processElement(value: Record,
  27. ctx: KeyedProcessFunction[String, Record, Int]#Context,
  28. out: Collector[Int]): Unit =
  29. {
  30. val currentSum: Int = partialSum.value()
  31. partialSum.update(currentSum + value.score)
  32. out.collect(partialSum.value())
  33. }
  34. })
  35. // 4. Sink
  36. partialSums.print()
  37. // 5. Build JobGraph and execute
  38. env.execute("sample-job")
  39. }
  40. }

我希望它的输出是流: 1, 3, 6, 7, 9, 12 . 事实上,就在这里。
我们是否可以安全地假设情况总是这样,特别是在读取具有大量并行性的源代码时?

ryoqjall

ryoqjall1#

在您的示例中,顺序在每个键中都有保证。因为只有一把钥匙,你永远都会得到 1, 3, 6, 7, 9, 12 .
当您从并行度大于1的源读取时,各种源示例将相互竞争。当来自两个或多个源的流被连接(例如,通过keyby、union、rebalance等)时,结果是不确定的(但是来自每个源的事件将保持它们的相对顺序)。
例如,如果你有

  1. stream X: 1, 2, 3, 4
  2. stream Y: a, b, c, d

然后把这两条溪流汇集在一起 1, 2, 3, 4, a, b, c, d ,或 a, b, 1, 2, 3, c, 4, d 等等。

相关问题