如何避免pyspark中join操作中的过度洗牌?

vkc1a9a2  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(453)

我有一个大约25GB大小的大sparkDataframe,我必须与另一个大约15GB大小的Dataframe连接。
现在,当我运行代码时,大约需要15分钟才能完成
资源分配是40个执行器,每个执行器有128 gb内存
当我查看它的执行计划时,正在执行sort merge join。
问题是:
连接在同一个键但不同的表上执行大约5到6次,因为在为每个执行的连接合并/连接数据之前,大部分时间都在对数据进行排序和对分区进行共同定位。
那么,在执行连接之前,是否有任何方法可以对数据进行排序,这样就不会对每个连接执行排序操作,也不会以这样的方式进行优化:排序所需的时间更少,实际连接数据所需的时间更多?
我只想在执行连接之前对Dataframe进行排序,但不确定如何进行?
例如:
如果我的Dataframe在id列上连接

joined_df = df1.join(df2,df1.id==df2.id)

如何在加入之前根据“id”对Dataframe进行排序,以便分区位于同一位置?

ndh0cuux

ndh0cuux1#

在过去,我通过按连接列重新划分输入Dataframe获得了很好的结果。这允许spark执行本地连接,最大限度地减少洗牌。例如

joined_df = df1.repartition(num_partitions,'id').join(df2.repartition(num_partitions, 'id'),on=['id'])

在这个答案中,还建议在读取后立即重新分区以使用分区本地操作。

r1zhe5dt

r1zhe5dt2#

那么,在执行连接之前,是否有任何方法可以对数据进行排序,这样就不会对每个连接执行排序操作,也不会以这样的方式进行优化:排序所需的时间更少,实际连接数据所需的时间更多?
闻起来像是木桶的味道。
bucketing是一种优化技术,它使用bucket(和bucketing列)来确定数据分区并避免数据洗牌。
我们的想法是 bucketBy 这些数据集使spark知道密钥位于同一位置(已经预先洗牌)。在参与连接的Dataframe中,bucket和bucketing列的数量必须相同。
请注意,这是支持配置单元或Spark表( saveAsTable )当从元存储(spark或hive)获取bucket元数据时。

相关问题