使用minicluster测试flink作业以使用处理时间触发计时器

xxb16uws  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(423)

在使用miniclusterwithclientresource测试flink作业时,是否有方法控制触发计时器的处理时间?
我能够测试keyedcoprocessfunction的两种方法,即processelement()。。。正在触发计时器回调,即ontimer()。。。在单元测试中,使用testharness并控制处理时间,即:
//通过直接提前操作员的处理时间触发处理时间计时器testharness.setprocessingtime(300000)
因此。我可以在指定的时间触发计时器
但是,我现在需要的是在使用minicluster miniclusterwithclientresource的端到端flink作业测试中触发计时器
val flinkcluster=新建miniclusterwithclientresource。。。并且能够提前处理时间来触发ontimer方法

vsikbqxv

vsikbqxv1#

在发送完所有消息后,在sourcefunction类中添加一个thread.sleep(1000)1秒解决了问题。

class MySourceFunction() extends RichParallelSourceFunction[]{
...

//is a one-time Delay after all messages have been sent
Thread.sleep(1000)
}

相关问题