flink:执行完成后访问操作符状态

c90pui9n  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(695)

假设我有一个带有原始状态的自定义rich函数。当flink作业结束时,如何将状态(从操作符的每个并行示例)返回到主/驱动程序代码?

abstract class MyRichMap extends RichMapFunction[SomeType, Unit] {

  protected var someVar: Engine = _ 

  override def open(parameters: Configuration): Unit = {
    // assume someVar inititation here
    ....
  }

  override def map(value: SomeType): Unit = {

    engine.process(value)

  }
val env = StreamExecutionEnvironment.getExecutionEnvironment
    ...
    someSource.map (new MyRichMap())

    env.execute()
    // How to get engine or some field of it here? (e.g., engine.someCounter)

最好的方法是什么?

jhkqcmku

jhkqcmku1#

如果你想测试 MyRichMap() ,然后从单元测试开始-看到了吗https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
如果您想测试一个完整的工作流,那么在单个jvm内部(例如,在本地运行命令行或eclipse)的一个简单方法是创建一个接收器,将结果捕获到一个(线程安全的)单例中,然后检查内容。这意味着您的源已完成(已绑定),因此工作流将终止。

相关问题