kafka direct consumer开始将读取限制为每批(5秒)450个事件(5*90个分区),在此之前的1到2天内(每批大约5000到40000个事件)运行良好
我使用在aws中运行的spark独立集群(spark和spark流kafka版本1.6.1),并使用s3 bucket作为检查点目录 StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext)
,每个工作节点上没有调度延迟和足够的磁盘空间。
没有更改任何kafka客户端初始化参数,非常确定kafka的结构没有更改:
val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
也不明白为什么当消费者直接描述说 The consumed offsets are by the stream itself
在创建流式处理上下文时,我仍然需要使用检查点目录?
1条答案
按热度按时间xn1cxnb41#
这通常是通过设置启用背压的结果
spark.streaming.backpressure.enabled
是真的。通常,当backpressure算法看到有更多的数据进入,然后它习惯了,它开始封顶每一批相当小的大小,直到它可以“稳定”自己再次。这有时会出现误报,并导致流的处理速度减慢。如果您想稍微调整一下启发式,它使用了一些未记录的标志(只需确保您知道自己在做什么):
如果你想知道血淋淋的细节,
PIDRateEstimator
就是你要找的。