在所有spark任务完成后,我检测到一个意外的磁盘io(diskbusy峰值),但spark上下文没有停止——如图中的案例2所示 21:56:47
. 有谁能帮忙解释一下,并就如何避免或推迟提出建议吗?或者spark上下文是否有一些可能导致峰值的周期性异步io活动?谢谢!
给出了在两种情况下运行sparksql批处理作业的示例。在第一个示例中,我执行sql工作负载,然后立即停止spark上下文 .show()
动作结束。在第二种情况下,我增加了一分钟的睡眠后 .show()
通过使用 Thread.sleep(60000)
,然后停止spark上下文。结果表明,在两种情况下执行sql工作负载的时间开销是相似的,但是在第二种情况下,正在进行本地存储以进行无序写入的磁盘上出现了意外的diskbusy峰值。见案例2中的尖峰。
这里有更多的细节。
系统设置
spark 2.3.1、hadoop 2.9.1、hive 2.3.4用于元数据存储。
一个主节点和两个工作节点(worker1和worker2)。每个节点都有足够的可用资源(32核、750g内存和8个8-t磁盘,从disk1到disk8)。
hdfs部署在disk8上;disk1用于spark shuffle写入本地存储。
我使用Yarn作为集群管理。
我使用系统监视器工具“nmon”来检测磁盘活动。
后端没有其他大型应用程序在运行。
我用 yarn client
提交代码时的模式。我使用8个执行器,每个执行器有4个内核和8gb内存。
注意,我将hdfs和yarn本地文件放在两个不同的磁盘上-- yarn_local
目录位于每个工作节点的disk1上,hdfs部署在两个工作节点的disk8上。每个磁盘都有 8T
. 因此可以区分hdfs和本地磁盘的活动。
这是我目前的分析
它不是由磁盘本身和其他后台进程引起的。我尝试了disk2、disk3、disk4和disk8作为本地存储,以测试峰值是否与程序相关,并且每次执行case2时它都显示相同的峰值。
尖峰是由Spark本身引起的。我尝试了独立部署模式,尖峰仍然存在(没有Yarn)。
这可能与洗牌有关。我的目标批处理作业的总无序写入大小接近 2GB
. 我还尝试了不同的工作负载,其无序写入大小接近 1MB
, 250MB
以及 1GB
. 对于具有无序写入大小的批处理作业,diskbusy可以忽略不计 1MB
然后变成 80%
对于具有总洗牌写入大小的批处理作业 250MB
.
跟踪本地存储文件的大小。出现磁盘尖峰时,会检测到磁盘写入,但磁盘大小不会增加。因此,(1)可能与磁盘缓存清理无关(2)可能发生了一些磁盘交换(不太确定)。
根据我目前的分析,我怀疑这应该是由一些我不熟悉的东西引起的——比如磁盘上的一些spark异步行为。有人能解释一下吗?谢谢!
这是第一个案例。
这是第二种情况。
更清楚的是,图中 worker1 node local
表示worker1中的disk1, the worker2 local
表示worker2中的disk1;这个 worker1 node dfs
表示worker1中的disk8和 worker2 node dfs
表示worker2中的disk8,hdfs所在的位置。左y轴是由检测到的diskbusy(从0%到100%) nmon
右边的y轴是disk8中hdfs的目录大小(对于这个问题,我们可以忽略它)。
这是我的密码。
import org.apache.spark.sql.SparkSession
object Q16 {
def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"
val spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
spark.sql(s"use $db")
val t1 = System.currentTimeMillis()
spark.sql(
s"""
|SELECT w_state, i_item_id,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_before,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_after
|FROM (
| SELECT *
| FROM web_sales ws
| LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
| AND ws.ws_item_sk = wr.wr_item_sk)
|) a1
|JOIN item i ON a1.ws_item_sk = i.i_item_sk
|JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
|JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
|GROUP BY w_state,i_item_id
|--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
|ORDER BY w_state,i_item_id
|LIMIT 100
""".stripMargin).show
val t2 = System.currentTimeMillis()
// For case 2
// Thread.sleep(60 * 1000)
spark.stop()
}
}
1条答案
按热度按时间8yparm6h1#
我找出了意外io活动的原因。
这是文件系统缓冲区缓存行为。通常,当进程写入文件时,数据不会立即写入磁盘,而是写入内存中的缓存。此缓存由操作系统/文件系统作为性能优化来维护,因为它允许写入请求在写入内存后返回,而不是等待缓慢的i/o完成。这些脏数据由操作系统定期刷新到后台的磁盘上。
因此,基本上,磁盘活动(刷新)是无法避免的,除非在磁盘缓冲区中缓存时删除文件页(在案例1中)。
您可以使用linux系统命令强制立即写出所有脏数据
sync
.