我正在运行一个结构化的Kafka源流作业。
Spark:2.4.7
python:3.6.8版
spark = SparkSession.builder.getOrCreate()
ds = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", topic_name)
.load())
# data preprocessing
ds = ...
model = GBTClassificationModel.load(model_path)
ds = model.transform(ds)
query = (ds.writeStream
.outputMode("update")
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("checkpointLocation", checkpoint_dir)
.option("topic", output_topic)
.trigger(processingTime="0 seconds")
.start())
query.awaitTermination()
spark web ui显示以下指示器: AddBatch
: 1955.0 GetBatch
: 1.0 GetOffset
: 0.0 QueryPlanning
: 3555.0 TriggerExecution
: 2.0 WalCommit
: 5569.0 undefined
: 20.0
以下是星火官方网站各项指标的说明:
操作持续时间。执行各种操作所用的时间(毫秒)。跟踪操作如下所示。 addBatch
:从源读取微批的输入数据、处理数据以及将批的输出写入接收器所用的时间。这将占用微批次的大部分时间。 getBatch
:准备逻辑查询以从源读取当前微批的输入所用的时间。 latestOffset & getOffset
:查询此源的最大可用偏移量所用的时间。 queryPlanning
:生成执行计划所用的时间。 walCommit
:将偏移量写入元数据日志所用的时间。
你为什么 WalCommit
以及 QueryPlanning
远大于 AddBatch
?
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!