我的spark结构化流式代码在运行几个小时后会导致响应速度慢吗?

xkftehaa  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(344)

spark作业具有正确的功能和逻辑。然而,经过几个小时的运行,它变得越来越慢。下面的代码中是否有一些陷阱?

  1. val query = "(select * from meta_table) as meta_data"
  2. val meta_schema = new StructType()
  3. .add("config_id", BooleanType)
  4. .add("threshold", LongType)
  5. var meta_df = spark.read.jdbc(url, query, connectionProperties)
  6. var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
  7. //rules_imsi_df: joining of kafka ingestion with the meta_df_explode
  8. //rules_monitoring_df: static dataframe for monitoring purpose
  9. val rules_monitoring_stream =
  10. rules_imsi_df.writeStream
  11. .outputMode("append")
  12. .format("memory")
  13. .trigger(Trigger.ProcessingTime("120 seconds"))
  14. .foreachBatch {
  15. (batchDF: DataFrame, batchId: Long) =>
  16. if(!batchDF.isEmpty) {
  17. printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())
  18. batchDF.show()
  19. batchDF.persist() var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id")
  20. rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0)
  21. rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id")) batchDF.unpersist() } }.start()
  22. while(rules_monitoring_stream.isActive) {
  23. Thread.sleep(240000)
  24. ... //Periodically load meta data from database
  25. meta_df = spark.read.jdbc(url, query, connectionProperties)
  26. meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
  27. }
6ju8rftf

6ju8rftf1#

以下几点是我对上述代码的观察。如果我发现什么,我会加上。
删除 format("memory") 因为它是为了调试而不是为了生产用例。
全部删除 var 从密码里。
不分配 DataFrame 回到 rules_monitoring_df 内部 foreachBatch 像这样 foreachBatch 将由多个线程同时调用,您可能会得到错误的结果。相反,尝试将结果保存到hdfs或hive表中,并在需要时读回。
尝试在代码下面移动到 spark streaming listener 如果可能,避免额外的 while 循环。

  1. while(rules_monitoring_stream.isActive) {
  2. Thread.sleep(240000)
  3. ... //Periodically load meta data from database
  4. meta_df = spark.read.jdbc(url, query, connectionProperties)
  5. meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")
  6. }

相关问题