如何避免不必要的 Shuffle 在pyspark?

93ze6v8z  于 2023-01-01  发布在  Spark
关注(0)|答案(1)|浏览(258)

我有两个CSV:df_salesdf_products。我希望使用pyspark执行以下操作:
1.将df_salesdf_products合并到product_id上。df_merged = df_sales.join(df_products,df_sales.product_id==df_products.product_id,"inner")
1.计算每个产品df_sales.num_pieces_sold的总和。df_sales.groupby("product_id").agg(sum("num_pieces_sold"))
1和2都需要在product_id上混洗df_sales
如何避免将df_sales打乱2次?

rslzwgfq

rslzwgfq1#

一种解决方案是使用repartition对 Dataframe 进行一次 Shuffle ,然后使用cache将结果保存在内存中:

  1. cached_df_sales = df_sales.repartition("product_id").cache()
  2. # and then do your work
  3. cached_df_sales\
  4. .join(df_products,cached_df_sales.product_id==df_products.product_id,"inner")
  5. cached_df_sales.groupby("product_id").agg(sum("num_pieces_sold"))

然而,我不确定这是不是一个好主意。缓存整个df_sales Dataframe 可能会占用大量内存,这取决于它的大小。而且,groupBy只会重排 Dataframe 的两列,这可能会相当便宜。在试图避免重排之前,我将首先确保这一点。
更一般地说,在尝试优化任何东西之前,简单地编写它,运行它,看看哪些东西需要时间,并专注于此。

相关问题