在使用miniclusterwithclientresource测试flink作业时,是否有方法控制触发计时器的处理时间?
我能够测试keyedcoprocessfunction的两种方法,即processelement()。。。正在触发计时器回调,即ontimer()。。。在单元测试中,使用testharness并控制处理时间,即:
//通过直接提前操作员的处理时间触发处理时间计时器testharness.setprocessingtime(300000)
因此。我可以在指定的时间触发计时器
但是,我现在需要的是在使用minicluster miniclusterwithclientresource的端到端flink作业测试中触发计时器
val flinkcluster=新建miniclusterwithclientresource。。。并且能够提前处理时间来触发ontimer方法
1条答案
按热度按时间vsikbqxv1#
在发送完所有消息后,在sourcefunction类中添加一个thread.sleep(1000)1秒解决了问题。