为什么在完成作业和关闭spark之间会出现磁盘繁忙高峰?

8nuwlpux  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(383)

在所有spark任务完成后,我检测到一个意外的磁盘io(diskbusy峰值),但spark上下文没有停止——如图中的案例2所示 21:56:47 . 有谁能帮忙解释一下,并就如何避免或推迟提出建议吗?或者spark上下文是否有一些可能导致峰值的周期性异步io活动?谢谢!
给出了在两种情况下运行sparksql批处理作业的示例。在第一个示例中,我执行sql工作负载,然后立即停止spark上下文 .show() 动作结束。在第二种情况下,我增加了一分钟的睡眠后 .show() 通过使用 Thread.sleep(60000) ,然后停止spark上下文。结果表明,在两种情况下执行sql工作负载的时间开销是相似的,但是在第二种情况下,正在进行本地存储以进行无序写入的磁盘上出现了意外的diskbusy峰值。见案例2中的尖峰。
这里有更多的细节。
系统设置
spark 2.3.1、hadoop 2.9.1、hive 2.3.4用于元数据存储。
一个主节点和两个工作节点(worker1和worker2)。每个节点都有足够的可用资源(32核、750g内存和8个8-t磁盘,从disk1到disk8)。
hdfs部署在disk8上;disk1用于spark shuffle写入本地存储。
我使用Yarn作为集群管理。
我使用系统监视器工具“nmon”来检测磁盘活动。
后端没有其他大型应用程序在运行。
我用 yarn client 提交代码时的模式。我使用8个执行器,每个执行器有4个内核和8gb内存。
注意,我将hdfs和yarn本地文件放在两个不同的磁盘上-- yarn_local 目录位于每个工作节点的disk1上,hdfs部署在两个工作节点的disk8上。每个磁盘都有 8T . 因此可以区分hdfs和本地磁盘的活动。
这是我目前的分析
它不是由磁盘本身和其他后台进程引起的。我尝试了disk2、disk3、disk4和disk8作为本地存储,以测试峰值是否与程序相关,并且每次执行case2时它都显示相同的峰值。
尖峰是由Spark本身引起的。我尝试了独立部署模式,尖峰仍然存在(没有Yarn)。
这可能与洗牌有关。我的目标批处理作业的总无序写入大小接近 2GB . 我还尝试了不同的工作负载,其无序写入大小接近 1MB , 250MB 以及 1GB . 对于具有无序写入大小的批处理作业,diskbusy可以忽略不计 1MB 然后变成 80% 对于具有总洗牌写入大小的批处理作业 250MB .
跟踪本地存储文件的大小。出现磁盘尖峰时,会检测到磁盘写入,但磁盘大小不会增加。因此,(1)可能与磁盘缓存清理无关(2)可能发生了一些磁盘交换(不太确定)。
根据我目前的分析,我怀疑这应该是由一些我不熟悉的东西引起的——比如磁盘上的一些spark异步行为。有人能解释一下吗?谢谢!
这是第一个案例。

这是第二种情况。

更清楚的是,图中 worker1 node local 表示worker1中的disk1, the worker2 local 表示worker2中的disk1;这个 worker1 node dfs 表示worker1中的disk8和 worker2 node dfs 表示worker2中的disk8,hdfs所在的位置。左y轴是由检测到的diskbusy(从0%到100%) nmon 右边的y轴是disk8中hdfs的目录大小(对于这个问题,我们可以忽略它)。
这是我的密码。

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
    val db = s"bigbench_sf_100"

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    spark.sql(s"use $db")

    val t1 = System.currentTimeMillis()
    spark.sql(
      s"""
         |SELECT w_state, i_item_id,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_before,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_after
         |FROM (
         |  SELECT *
         |  FROM web_sales ws
         |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
         |    AND ws.ws_item_sk = wr.wr_item_sk)
         |) a1
         |JOIN item i ON a1.ws_item_sk = i.i_item_sk
         |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
         |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
         |GROUP BY w_state,i_item_id
         |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
         |ORDER BY w_state,i_item_id
         |LIMIT 100
       """.stripMargin).show
    val t2 = System.currentTimeMillis()

//    For case 2
//    Thread.sleep(60 * 1000)

    spark.stop()
  }
}
8yparm6h

8yparm6h1#

我找出了意外io活动的原因。
这是文件系统缓冲区缓存行为。通常,当进程写入文件时,数据不会立即写入磁盘,而是写入内存中的缓存。此缓存由操作系统/文件系统作为性能优化来维护,因为它允许写入请求在写入内存后返回,而不是等待缓慢的i/o完成。这些脏数据由操作系统定期刷新到后台的磁盘上。
因此,基本上,磁盘活动(刷新)是无法避免的,除非在磁盘缓冲区中缓存时删除文件页(在案例1中)。
您可以使用linux系统命令强制立即写出所有脏数据 sync .

相关问题