我有大量的数据,我希望高效地(即使用一个相对较小的spark集群)执行其中一列的count和distinct操作。
如果我做了看起来很明显的事情(将数据加载到Dataframe中):
df = spark.read.format("CSV").load("s3://somebucket/loadsofcsvdata/*").toDF()
df.registerView("someview")
然后尝试运行查询:
domains = sqlContext.sql("""SELECT domain, COUNT(id) FROM someview GROUP BY domain""")
domains.take(1000).show()
我的集群只是崩溃和烧毁-抛出内存异常或挂起/崩溃/没有完成操作。
我猜在这条路上的某个地方有某种连接会让执行者的记忆荡然无存?
当源数据规模很大而目标数据规模不大时,执行这样的操作的理想方法是什么(上面查询中的域列表相对较短,应该很容易放入内存)
此问题提供的相关信息:spark.sql.shuffle.partitions的最佳值应该是多少,或者在使用spark sql时如何增加分区?
1条答案
按热度按时间hgc7kmma1#
我建议调整你的执行者设置。特别是,正确设置以下参数可以显著提高性能。
在您的情况下,我还建议根据需要调整分区的数量,特别是将下面的param从默认值200增加到更高的值。