如何提高groupby聚合的执行时间来计算spark中的百分位数?

4uqofj5v  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(382)

我正在尝试设置一个pyspark作业,它在每天约700gb的数据量上估计p25、p50、p75、p90。我用40个工作节点运行这个作业,每个节点有32g内存和8个vcpu,但最终运行了15个小时才能完成。我假设延迟是由于值需要在节点间排序以计算百分位数。有没有其他方法可以加快这一进程?
输入数据模式-

root
 |-- processed_date: date (nullable = true)
 |-- id: string (nullable = true)
 |-- experiment: string (nullable = true)
 |-- type: string (nullable = true)
 |-- value: double (nullable = true)
 |-- revision: string (nullable = true)
 |-- source: string (nullable = true)
 |-- region: string (nullable = true)
df_agg = df.groupby('processed_date', 'id', 'experiment', 'type').agg(
                             F.min('value').alias('min'),
                             F.max('value').alias('max'),
                             F.avg('value').alias('avg'),
                             F.expr('percentile(value, 0.25)').alias('p25'),
                             F.expr('percentile(value, 0.50)').alias('p50'),
                             F.expr('percentile(value, 0.75)').alias('p75'),
                             F.expr('percentile(value, 0.90)').alias('p90'))

谢谢!

kuhbmx9i

kuhbmx9i1#

你可以尝试重新分区 DataFrame.repartition 列上的Dataframe df = df.repartition('processed_date', 'id', 'experiment', 'type') 这样,与上述列的组合相关的所有记录都将位于同一节点中。

6ojccjat

6ojccjat2#

仅使用列来重新分区意味着它在表达式中使用的列上使用哈希分区器 spark.sql.shuffle.partitions ,因此在默认的无序分区不足够的情况下,这将不能很好地工作(默认值为 200 )
u应设置 numPartitions as well as column expressions . 对于这种情况,我会这样做:

df=df.repartition(1000, *['processed_date', 'id', 'experiment', 'type'])

或者在应用重新分区(仅使用列)之前,设置洗牌分区:

spark.conf.set("spark.sql.shuffle.partitions",1000)

df=df.repartition(*['processed_date', 'id', 'experiment', 'type'])`

我建议您在应用groupby之前重新分区并溢出到磁盘,以便利用 adequate partitioning and in-memory computing (确保一次通过):
使用溢出到磁盘的数据仍然比完全不放入内存要快。

from pyspark.storagelevel import StorageLevel

df=df.repartition(1000, *['processed_date', 'id', 'experiment', 'type'])\
      .persist(StorageLevel.MEMORY_AND_DISK)

numpartitions的计算方法是 workers * cores * (2 or 3) (因为几乎所有的现代虚拟内核都是多线程的)它给出了8403=960,我四舍五入到1000

相关问题