我使用的是spark 3.0.2,我有一个流式处理作业,它使用来自kafka的数据,触发持续时间为“1分钟”。
我在SparkUI中看到定义的每1分钟有一个新作业,但我看到了方法 onQueryProgress
每5~6分钟就有一次。我认为这个方法应该在每个微博客之后直接调用。
有没有办法控制这个持续时间并使它等于触发持续时间?
我使用的是spark 3.0.2,我有一个流式处理作业,它使用来自kafka的数据,触发持续时间为“1分钟”。
我在SparkUI中看到定义的每1分钟有一个新作业,但我看到了方法 onQueryProgress
每5~6分钟就有一次。我认为这个方法应该在每个微博客之后直接调用。
有没有办法控制这个持续时间并使它等于触发持续时间?
2条答案
按热度按时间mbyulnm01#
这个
inQueryProgress
streamingquerylistener的方法在每个微批处理中完成数据处理后异步调用。您将看到此侦听器仅每5~6分钟触发一次,因为流式处理作业需要这段时间来处理在微批处理中获取的所有数据。将触发器持续时间设置为1分钟将有助于相应地规划任务,但这并不意味着作业也能够在1分钟的时间范围内处理所有可用数据。
为了减少查询从kafka获取的数据量,可以使用source选项
maxOffsetsPerTrigger
.顺便说一句,如果您不处理任何数据,默认情况下每10秒调用一次此方法。如果你想避免这种情况发生,你可以做一个
if(event.progress.numInputRows > 0)
.2jcobegt2#
我发现我的案子的原因是
onQueryProgress
方法需要5分钟才能完成。就像迈克提到的那样
onQueryProgress
正在异步调用,但我认为它使用相同的线程来调用此方法。所以它正在等待方法调用完成,以便再次调用它。所以在我的案例中,解决方法是找出为什么需要这么长时间,并使其比触发持续时间更快。