我正在尝试运行一个简单的查询,假设使用spark.sql(“query”)运行查询与使用dataframes相比没有性能差异,因为我使用的是spark 2.1.0,我有catalyst optimizer来处理优化部分&tungs enabled。
这里我用一个左外连接连接两个表。我的第一个表是200 gb,是驱动表(在左侧),第二个表是2gb,根据我们的业务要求,必须没有过滤器。
我的群集的配置。由于这是共享集群,我分配了一个特定的队列,允许我使用3 tb的内存(是3 tb),但vCore的数量是480。这意味着我只能运行480个并行任务。除此之外,在Yarn级别,我有一个限制,即每个节点最多有8个芯。最大16 gb的容器内存限制。因此,我不能给我的执行器内存(每节点)超过12 gb,因为我给了3 gb作为执行器内存更安全的一面,这成为每节点15 gb的内存利用率。
因此,在计算了480个允许的vcore总数以及每个节点8个核的限制之后,我得到了480/8=60个节点。这意味着60*15=900 gb的可用内存(我不知道为什么总队列内存分配为3 tb),这是峰值。。如果我是唯一一个使用队列的人,但情况并非总是如此。
现在的疑问是如何激发整个900 gb的内存。从数字和统计我可以清楚地说,我的工作将运行没有任何问题,因为我试图处理的数据大小只有210-250 gb的最大值&我有900 gb的可用内存。
但我总是被容器杀死。我不能增加Yarn容器的尺寸,因为它是在Yarn水平和整体集群将得到增加的容器大小,这是不正确的事情。我还尝试过使用sparksession.config(property)在代码中将vmem-check.enabled属性禁用为false,但这也没有帮助,因为我不允许在yarn级别更改任何内容,所以它可能会忽略这一点。
现在spark最初是基于什么来分割数据的?它是基于在集群级别定义的块大小(假设128 mb)我这样想的,因为当我的作业开始时,我看到我的大表(大约200 gb)有2000个任务,所以spark是基于什么来计算这2000个任务(分区),我认为这可能是默认的分区大小spark开始加载我的表,通过在spark ui的stage选项卡下看到输入大小/记录和shuffle write size/记录,我的表非常大,这就是为什么我得到容器终止错误和增加执行器内存开销的建议的原因,这也没有帮助。
我尝试将数据从10k分区重新分区到100k分区,并尝试将其持久化为仅内存\u、内存\u和磁盘\u、仅磁盘\u,但没有任何帮助。我的很多任务都失败了,最后工作也失败了。有时用容器杀死,直接缓冲等。
现在来看看持久化/缓存有什么用,它是如何工作的。。????我在做什么
val result = spark.sql("query big_table").repartition(10000, $<column name>).persist()
重新分区中的列是连接键,因此它将被分发。为了在连接之前完成这项工作,我正在执行result.show(1)。所以这个动作被执行,数据被持久保存在磁盘上,spark将读取持久保存在磁盘上的数据以进行连接,内存将不会有任何负载,因为它存储在磁盘上的小块中(我在这里说的对吗
为什么在配置单元中使用相同的大表加上一些具有左连接的附加表来完成相同的作业。虽然这需要时间,但它成功地完成,但它失败的Spark。。??为什么?spark不是Hive的完全替代品吗。。??spark在溢出到磁盘时的工作方式与hive不同&在使用磁盘进行持久化时将数据写入磁盘。
如果我们有较少的容器尺寸,但节点数量较多,Yarn容器尺寸会起作用吗??
spark是否合并所有可用节点的内存(根据容器大小,每个节点15 gb)并将它们合并以加载一个大分区。。??
暂无答案!
目前还没有任何答案,快来回答吧!