我有Spark流与直接流,我用下面的配置
批处理间隔60s
spark.streaming.kafka.MaxRatePer42分区
auto.offset.reset最早
当我用最早的选项启动流处理批处理时,为了更快地使用来自kafka的消息并减少延迟,我将spark.streaming.kafka.maxrateperpartition保留为42。因此,它应该消耗42 x 60s x 60分区=每批151200条记录。
我有两个问题
我看到最初的几批正确地消耗了151200条记录,在后来的几批中逐渐减少,尽管Kafka有很多记录要消耗。请看下面的截图。原因是什么
我看到成批的人排起了很多队。我们怎样才能避免这种情况。
有没有可能实现下面的场景我们把批处理时间间隔定为60秒,如果每个批都在60秒内运行,下一批就可以准时开始。如果一批时间超过60秒,我们不希望下一批进入队列。一旦现有的运行完成,下一个运行就可以通过拾取记录开始,直到该时间为止。这样我们就不会有滞后,也不会排队成批。
spark ui-问题1的屏幕截图
1条答案
按热度按时间8fq7wneg1#
您所观察到的是spark的背压机制的预期行为。
您已经设置了配置
spark.streaming.kafka.maxRatePerPartition
到42,并且正如您所计算的,作业将开始抓取查看所附屏幕截图中的时间(处理时间),作业将以该数量的记录开始。
但是,由于处理所有这些151200条记录需要超过60秒的时间,因此背压机制将减少后续批处理中的输入记录。这只发生在几个批次之后,因为背压机制(也称为“pid控制器”)需要等到第一个批次完成,以便它可以使用该经验来估计下一个间隔的输入记录数。如前所述,处理前151200所花费的时间比仅仅一个间隔要长,这意味着随后的两个间隔已经用maxrateperpartition进行了调度,而没有完成批处理间隔的经验。
这就是为什么您只在第四批中看到输入记录递减的原因。然后,输入记录的数量仍然太高,无法在60秒内进行处理,因此该作业建立了越来越多的延迟,pid控制器(背压)最终意识到它落后于许多记录,并且正在将输入记录的数量大幅减少到由设置的最小值
spark.streaming.backpressure.pid.minRate
. 在您的例子中,这个值似乎被设置为2,这导致每批间隔26060=7200条记录。总而言之,你观察到的是预期和预期的行为。流式处理作业需要一些批处理来理解和了解它应该从kafka获取多少数据,以适应给定的(非灵活的)批处理间隔60秒。不管一个批次的处理时间有多长,流式处理作业都会以每60秒一次的速度提前计划下一个批次。
你能做什么:
建议设置
maxRatePerPartition
约为实际容量的150-200%。只要让作业再运行一段时间,你就会看到估计的100%将是什么。当您在kafka中使用60个分区时,您需要确保数据均匀地分布在各个分区上。只有这样maxrateperpartition才会执行您想要执行的操作
拥有60个分区,您可以在spark集群中使用60个内核来获得最大的消耗速度。