spark作业具有正确的功能和逻辑。然而,经过几个小时的运行,它变得越来越慢。下面的代码中是否有一些陷阱?
val query = "(select * from meta_table) as meta_data"
val meta_schema = new StructType()
.add("config_id", BooleanType)
.add("threshold", LongType)
var meta_df = spark.read.jdbc(url, query, connectionProperties)
var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
//rules_imsi_df: joining of kafka ingestion with the meta_df_explode
//rules_monitoring_df: static dataframe for monitoring purpose
val rules_monitoring_stream =
rules_imsi_df.writeStream
.outputMode("append")
.format("memory")
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch {
(batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty) {
printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())
batchDF.show()
batchDF.persist() var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id")
rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0)
rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id")) batchDF.unpersist() } }.start()
while(rules_monitoring_stream.isActive) {
Thread.sleep(240000)
... //Periodically load meta data from database
meta_df = spark.read.jdbc(url, query, connectionProperties)
meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
}
1条答案
按热度按时间6ju8rftf1#
以下几点是我对上述代码的观察。如果我发现什么,我会加上。
删除
format("memory")
因为它是为了调试而不是为了生产用例。全部删除
var
从密码里。不分配
DataFrame
回到rules_monitoring_df
内部foreachBatch
像这样foreachBatch
将由多个线程同时调用,您可能会得到错误的结果。相反,尝试将结果保存到hdfs或hive表中,并在需要时读回。尝试在代码下面移动到
spark streaming listener
如果可能,避免额外的while
循环。