我有一个大约25GB大小的大sparkDataframe,我必须与另一个大约15GB大小的Dataframe连接。
现在,当我运行代码时,大约需要15分钟才能完成
资源分配是40个执行器,每个执行器有128 gb内存
当我查看它的执行计划时,正在执行sort merge join。
问题是:
连接在同一个键但不同的表上执行大约5到6次,因为在为每个执行的连接合并/连接数据之前,大部分时间都在对数据进行排序和对分区进行共同定位。
那么,在执行连接之前,是否有任何方法可以对数据进行排序,这样就不会对每个连接执行排序操作,也不会以这样的方式进行优化:排序所需的时间更少,实际连接数据所需的时间更多?
我只想在执行连接之前对Dataframe进行排序,但不确定如何进行?
例如:
如果我的Dataframe在id列上连接
joined_df = df1.join(df2,df1.id==df2.id)
如何在加入之前根据“id”对Dataframe进行排序,以便分区位于同一位置?
2条答案
按热度按时间ndh0cuux1#
在过去,我通过按连接列重新划分输入Dataframe获得了很好的结果。这允许spark执行本地连接,最大限度地减少洗牌。例如
在这个答案中,还建议在读取后立即重新分区以使用分区本地操作。
r1zhe5dt2#
那么,在执行连接之前,是否有任何方法可以对数据进行排序,这样就不会对每个连接执行排序操作,也不会以这样的方式进行优化:排序所需的时间更少,实际连接数据所需的时间更多?
闻起来像是木桶的味道。
bucketing是一种优化技术,它使用bucket(和bucketing列)来确定数据分区并避免数据洗牌。
我们的想法是
bucketBy
这些数据集使spark知道密钥位于同一位置(已经预先洗牌)。在参与连接的Dataframe中,bucket和bucketing列的数量必须相同。请注意,这是支持配置单元或Spark表(
saveAsTable
)当从元存储(spark或hive)获取bucket元数据时。