apachespark—在循环中使用sparkdf.write.saveastable()会导致作业之间的延迟呈指数级增加

ehxuflar  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(351)

我需要在for循环中执行一组不同的配置单元查询。

hc=HiveContext(sc)
queryList=[set of queries]
for i in range(0,X):
    hc.sql(queryList[i])
    sparkDF.write.saveAsTable('hiveTable', mode='append')

虽然这个代码对于较小的x值很有用,但是当x>100时它会引起问题。每个saveastable作业之间的延迟呈指数增长,但每个作业或多或少都需要一个常数5s。
我试图纠正这一切却没有任何运气:
在for循环中添加一次gc.collect()(i%100==0)。但这打破了for循环
关闭当前的spark和hive上下文一次(i%100==0)并创建一个新的上下文-这仍然不能解决问题
用Yarn簇代替Yarn客户-不走运!
是否有这样的选项,我创建一个到配置单元的连接,并在每次调用saveastable函数时关闭它?或者清理司机?

vpfxa7rd

vpfxa7rd1#

这是因为您使用for循环,它在spark驱动程序模式下执行,而不是分布在集群工作节点上,这意味着它没有使用并行能力,或者没有在工作节点上执行。尝试使用parallelize和一个分区来创建rdd,这将有助于在工作节点上生成作业
或者,如果只想处理hivecontext,可以创建全局hivecontext,如val hivectx=new hivecontext(sc),并在循环内重用。
在集群上运行作业时,还可以更改/优化执行器的数量,以提高性能

相关问题