spark流Kafka消费者直接消费速度下降

jk9hmnmh  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(403)


kafka direct consumer开始将读取限制为每批(5秒)450个事件(5*90个分区),在此之前的1到2天内(每批大约5000到40000个事件)运行良好
我使用在aws中运行的spark独立集群(spark和spark流kafka版本1.6.1),并使用s3 bucket作为检查点目录 StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext) ,每个工作节点上没有调度延迟和足够的磁盘空间。
没有更改任何kafka客户端初始化参数,非常确定kafka的结构没有更改:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topics)

也不明白为什么当消费者直接描述说 The consumed offsets are by the stream itself 在创建流式处理上下文时,我仍然需要使用检查点目录?

xn1cxnb4

xn1cxnb41#

这通常是通过设置启用背压的结果 spark.streaming.backpressure.enabled 是真的。通常,当backpressure算法看到有更多的数据进入,然后它习惯了,它开始封顶每一批相当小的大小,直到它可以“稳定”自己再次。这有时会出现误报,并导致流的处理速度减慢。
如果您想稍微调整一下启发式,它使用了一些未记录的标志(只需确保您知道自己在做什么):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

如果你想知道血淋淋的细节, PIDRateEstimator 就是你要找的。

相关问题