假设我有一个带有原始状态的自定义rich函数。当flink作业结束时,如何将状态(从操作符的每个并行示例)返回到主/驱动程序代码?
abstract class MyRichMap extends RichMapFunction[SomeType, Unit] {
protected var someVar: Engine = _
override def open(parameters: Configuration): Unit = {
// assume someVar inititation here
....
}
override def map(value: SomeType): Unit = {
engine.process(value)
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
someSource.map (new MyRichMap())
env.execute()
// How to get engine or some field of it here? (e.g., engine.someCounter)
最好的方法是什么?
1条答案
按热度按时间jhkqcmku1#
如果你想测试
MyRichMap()
,然后从单元测试开始-看到了吗https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html如果您想测试一个完整的工作流,那么在单个jvm内部(例如,在本地运行命令行或eclipse)的一个简单方法是创建一个接收器,将结果捕获到一个(线程安全的)单例中,然后检查内容。这意味着您的源已完成(已绑定),因此工作流将终止。