如何让spark处理更大的数据集?

hof1towb  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(290)

我有一个非常复杂的查询,需要用一些“groupby”表达式连接9个或更多的表。大多数表的行数几乎相同。这些表还具有一些列,这些列可用作划分表的“键”。
以前,应用程序运行良好,但现在数据集的数据是以前的3~4倍。我的测试结果表明,如果每个表的行数小于4000000,应用程序仍然可以很好地运行。但是,如果计数大于此值,应用程序将写入数百TB的洗牌,应用程序将暂停(无论我如何调整内存、分区、执行器等)。实际数据可能只有几十克。
我认为,如果分区工作正常,spark就不应该做太多的洗牌,应该在每个节点上进行连接。令人费解的是,为什么spark这么做并不那么“聪明”。
我可以将数据集(使用上面提到的“key”)拆分为许多数据集,这些数据集可以独立处理。但负担将在我自己身上…这正是使用spark的理由。还有什么方法可以帮助你?
我在hadoop上使用spark2.0。

oiopk7p5

oiopk7p51#

我的测试结果表明,如果每个表的行数小于4000000,应用程序仍然可以很好地运行。但是,如果计数大于此值,应用程序将写入数百TB的洗牌
当连接数据集时,如果一方的大小小于某个可配置的大小,spark会将整个表广播给每个执行器,以便可以在任何地方本地执行连接。你的上述观察与此相符。您还可以显式地向spark提供广播提示,如下所示 df1.join(broadcast(df2)) 除此之外,你能提供更多关于你的问题的细节吗?
[前一段时间,我还在为一个必须处理几个tb的工作处理加入和洗牌的问题。我们使用的是RDD(而不是数据集api)。我把我的发现写在这里。这些可能对您试图解释底层数据混乱的原因有些帮助。]
更新:根据文件-- spark.sql.autoBroadcastJoinThreshold 是可配置的属性键。 10 MB 是其默认值。它的作用如下:
配置在执行联接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为-1,可以禁用广播。请注意,当前统计信息仅支持已运行命令analyze table compute statistics noscan的配置单元元存储表。
显然,这只支持配置单元表。

相关问题