我在hdfs中有以下管道,我正在spark中处理
输入表: batch, team, user, metric1, metric2
此表可以以每小时批处理的形式包含用户级度量。在同一个小时内,一个用户可以有多个条目。
级别1聚合:此聚合用于获取每个用户每批的最新条目 agg(metric1) as user_metric1, agg(metric2) as user_metric2 (group by batch, team, user)
2级聚合:获取团队级度量 agg(user_metric1) as team_metric1, agg(user_metric2) as team_metric2 (group by batch, team)
输入表的大小为8gb(snappyParquet格式),以hdfs表示。我的spark工作是将shuffle write显示为40gb,每个executor shuffle溢出至少1gb。
为了最小化这一点,如果我在执行聚合之前在用户级别重新划分输入表,
df = df.repartition('user')
它能提高性能吗?如果我想减少洗牌,我应该如何处理这个问题?
使用以下资源运行时
spark.executor.cores=6
spark.cores.max=48
spark.sql.shuffle.partitions=200
1条答案
按热度按时间lnlaulya1#
spark将数据从一个节点转移到另一个节点,因为资源(输入数据…)分布在集群上,这会使计算速度变慢,并且会在集群上产生大量的网络流量,对于您的情况,转移的数量是由于分组的缘故,如果根据goup的三列进行重新分区,它将减少随机数,对于spark配置,默认的spark.sql.shuffle.partitions是200,假设我们让spark配置保持原样,重新分区将需要一些时间,一旦完成计算将更快: