我需要在for循环中执行一组不同的配置单元查询。
hc=HiveContext(sc)
queryList=[set of queries]
for i in range(0,X):
hc.sql(queryList[i])
sparkDF.write.saveAsTable('hiveTable', mode='append')
虽然这个代码对于较小的x值很有用,但是当x>100时它会引起问题。每个saveastable作业之间的延迟呈指数增长,但每个作业或多或少都需要一个常数5s。
我试图纠正这一切却没有任何运气:
在for循环中添加一次gc.collect()(i%100==0)。但这打破了for循环
关闭当前的spark和hive上下文一次(i%100==0)并创建一个新的上下文-这仍然不能解决问题
用Yarn簇代替Yarn客户-不走运!
是否有这样的选项,我创建一个到配置单元的连接,并在每次调用saveastable函数时关闭它?或者清理司机?
1条答案
按热度按时间vpfxa7rd1#
这是因为您使用for循环,它在spark驱动程序模式下执行,而不是分布在集群工作节点上,这意味着它没有使用并行能力,或者没有在工作节点上执行。尝试使用parallelize和一个分区来创建rdd,这将有助于在工作节点上生成作业
或者,如果只想处理hivecontext,可以创建全局hivecontext,如val hivectx=new hivecontext(sc),并在循环内重用。
在集群上运行作业时,还可以更改/优化执行器的数量,以提高性能