我正在尝试设置一个pyspark作业,它在每天约700gb的数据量上估计p25、p50、p75、p90。我用40个工作节点运行这个作业,每个节点有32g内存和8个vcpu,但最终运行了15个小时才能完成。我假设延迟是由于值需要在节点间排序以计算百分位数。有没有其他方法可以加快这一进程?
输入数据模式-
root
|-- processed_date: date (nullable = true)
|-- id: string (nullable = true)
|-- experiment: string (nullable = true)
|-- type: string (nullable = true)
|-- value: double (nullable = true)
|-- revision: string (nullable = true)
|-- source: string (nullable = true)
|-- region: string (nullable = true)
df_agg = df.groupby('processed_date', 'id', 'experiment', 'type').agg(
F.min('value').alias('min'),
F.max('value').alias('max'),
F.avg('value').alias('avg'),
F.expr('percentile(value, 0.25)').alias('p25'),
F.expr('percentile(value, 0.50)').alias('p50'),
F.expr('percentile(value, 0.75)').alias('p75'),
F.expr('percentile(value, 0.90)').alias('p90'))
谢谢!
2条答案
按热度按时间kuhbmx9i1#
你可以尝试重新分区
DataFrame.repartition
列上的Dataframedf = df.repartition('processed_date', 'id', 'experiment', 'type')
这样,与上述列的组合相关的所有记录都将位于同一节点中。6ojccjat2#
仅使用列来重新分区意味着它在表达式中使用的列上使用哈希分区器
spark.sql.shuffle.partitions
,因此在默认的无序分区不足够的情况下,这将不能很好地工作(默认值为200
)u应设置
numPartitions as well as column expressions
. 对于这种情况,我会这样做:或者在应用重新分区(仅使用列)之前,设置洗牌分区:
我建议您在应用groupby之前重新分区并溢出到磁盘,以便利用
adequate partitioning and in-memory computing
(确保一次通过):使用溢出到磁盘的数据仍然比完全不放入内存要快。
numpartitions的计算方法是
workers * cores * (2 or 3)
(因为几乎所有的现代虚拟内核都是多线程的)它给出了8403=960,我四舍五入到1000