flink流:从一个窗口,在另一个窗口中查找状态

wtzytmuj  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(467)

我有两条流:
测量
whomeasured(关于谁进行测量的元数据)
以下是它们的案例类:

  1. case class Measurement(var value: Int, var who_measured_id: Int)
  2. case class WhoMeasured(var who_measured_id: Int, var name: String)

这个 Measurement 流有很多数据。这个 WhoMeasured 小溪里几乎没有水。事实上,对于每个 who_measured_idWhoMeasured 流中,只有一个名称是相关的,因此如果一个元素具有相同的名称,则可以丢弃旧元素 who_measured_id 到达。这本质上是一个由 WhoMeasured 溪流。
在我的自定义窗口函数中

  1. class WFunc extends WindowFunction[Measurement, Long, Int, TimeWindow] {
  2. override def apply(key: Int, window: TimeWindow, input: Iterable[Measurement], out: Collector[Long]): Unit = {
  3. // Here I need access to the WhoMeasured stream to get the name of the person who took a measurement
  4. // The following two are equivalent since I keyed by who_measured_id
  5. val name_who_measured = magic(key)
  6. val name_who_measured = magic(input.head.who_measured_id)
  7. }
  8. }

这是我的工作。现在,正如您可能看到的,缺少了一些东西:两个流的组合。

  1. val who_measured_stream = who_measured_source
  2. .keyBy(w => w.who_measured_id)
  3. .countWindow(1)
  4. val measurement_stream = measurements_source
  5. .keyBy(m => m.who_measured_id)
  6. .timeWindow(Time.seconds(60), Time.seconds(5))
  7. .apply(new WFunc)

所以本质上这是一种查找表,当 WhoMeasured 小溪到了。
所以问题是:如何从一个 WindowedStream 变成另一个?
跟进:
按照fabian建议的方式实现后,作业总是会失败,并出现某种序列化问题:

  1. [info] Loading project definition from /home/jgroeger/Code/MeasurementJob/project
  2. [info] Set current project to MeasurementJob (in build file:/home/jgroeger/Code/MeasurementJob/)
  3. [info] Compiling 8 Scala sources to /home/jgroeger/Code/MeasurementJob/target/scala-2.11/classes...
  4. [info] Running de.company.project.Main dev MeasurementJob
  5. [error] Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
  6. [error] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
  7. [error] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1478)
  8. [error] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
  9. [error] at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:230)
  10. [error] at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:127)
  11. [error] at de.company.project.jobs.MeasurementJob.run(MeasurementJob.scala:139)
  12. [error] at de.company.project.Main$.main(Main.scala:55)
  13. [error] at de.company.project.Main.main(Main.scala)
  14. [error] Caused by: java.io.NotSerializableException: de.company.project.jobs.MeasurementJob
  15. [error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  16. [error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
  17. [error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
  18. [error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
  19. [error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
  20. [error] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
  21. [error] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
  22. [error] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
  23. [error] ... 7 more
  24. java.lang.RuntimeException: Nonzero exit code returned from runner: 1
  25. at scala.sys.package$.error(package.scala:27)
  26. [trace] Stack trace suppressed: run last MeasurementJob/compile:run for the full output.
  27. [error] (MeasurementJob/compile:run) Nonzero exit code returned from runner: 1
  28. [error] Total time: 9 s, completed Nov 15, 2016 2:28:46 PM
  29. Process finished with exit code 1

错误消息:

  1. The implementation of the RichCoFlatMapFunction is not serializable. The object probably contains or references non serializable fields.

然而,我唯一的领域 JoiningCoFlatMap 建议是什么 ValueState .
签名如下:

  1. class JoiningCoFlatMap extends RichCoFlatMapFunction[Measurement, WhoMeasured, (Measurement, String)] {
xyhw6mcr

xyhw6mcr1#

我想你要做的是一个窗口操作,然后是一个连接。
您可以使用有状态的 CoFlatMapFunction 如下例所示:

  1. val measures: DataStream[Measurement] = ???
  2. val who: DataStream[WhoMeasured] = ???
  3. val agg: DataStream[(Int, Long)] = measures
  4. .keyBy(_._2) // measured_by_id
  5. .timeWindow(Time.seconds(60), Time.seconds(5))
  6. .apply( (id: Int, w: TimeWindow, v: Iterable[(Int, Int, String)], out: Collector[(Int, Long)]) => {
  7. // do your aggregation
  8. })
  9. val joined: DataStream[(Int, Long, String)] = agg
  10. .keyBy(_._1) // measured_by_id
  11. .connect(who.keyBy(_.who_measured_id))
  12. .flatMap(new JoiningCoFlatMap)
  13. // CoFlatMapFunction
  14. class JoiningCoFlatMap extends RichCoFlatMapFunction[(Int, Long), WhoMeasured, (Int, Long, String)] {
  15. var names: ValueState[String] = null
  16. override def open(conf: Configuration): Unit = {
  17. val stateDescrptr = new ValueStateDescriptor[String](
  18. "whoMeasuredName",
  19. classOf[String],
  20. "" // default value
  21. )
  22. names = getRuntimeContext.getState(stateDescrptr)
  23. }
  24. override def flatMap1(a: (Int, Long), out: Collector[(Int, Long, String)]): Unit = {
  25. // join with state
  26. out.collect( (a._1, a._2, names.value()) )
  27. }
  28. override def flatMap2(w: WhoMeasured, out: Collector[(Int, Long, String)]): Unit = {
  29. // update state
  30. names.update(w.name)
  31. }
  32. }

关于执行情况的说明:a CoFlatMapFunction 无法决定要处理的输入,即 flatmap1 以及 flatmap2 函数的调用取决于到达操作符的数据。它不能由函数控制。初始化状态时出现问题。一开始,州政府可能没有一个正确的名字 Measurement 对象,但返回默认值。您可以通过缓冲测量值并连接它们一次来避免这种情况,即从 who 小溪来了。你需要另一个州。

展开查看全部

相关问题