环境:
spark-1.2.0-bin-hadoop2.4版本
spark-submit --class MyClass \
--master yarn-client \
--num-executors 20 --executor-memory 5g \
--driver-memory 4g --executor-cores 4 \
MyJar.jar
大家好,
最近我得到一个查询,它确实在同一个表上联接
SELECT columns_I_need
FROM
(
SELECT blablabla,column_for_join
FROM a_huge_table
WHERE common_expr AND expr_A
) a
LEFT JOIN
(
SELECT somethingelse,column_for_join
FROM a_huge_table
WHERE common_expr AND expr_B
) b
ON a.column_for_join = b.column_for_join
GROUP BY
columns_I_need
``` `a_huge_table` 是一个非常巨大的 `column_for_join` 没有索引(我不是dba,我也没办法-他们拒绝)
此查询在hive cli中需要3~5分钟(总共2个阶段,Map总数<200),但在使用 `HiveContext.sql()` ,事情以非常糟糕的方式崩溃->4个阶段,30分钟/阶段。
使用 `spark.sql.codegen` 将阶段数减少到2个,但每个阶段的时间成本不会减少。因为这个表只使用一次,所以我假设hivecontext.cachetable()不会有多大帮助。我想spark中使用的方法和hive有些不同。
我试图挖掘Spark源,发现很难理解: `HiveContext.sql(SqlText)` 返回 `SchemaRDD(HiveContext, LogicalPlan_Created_by_Parser_using_SqlText )` ,我看到在trait中使用了logicalplan `SchemaRDDLike` => `sqlContext.executePlan` => `executedPlan.execute()` => `SparkPlan.execute()` 但是我找不到 `SparkPlan.execute()` 调用了任何重写版本 `SELECT` . 然后我被卡住了。我还是不明白spark是如何从Hive中获取数据的。
那么sparksql是如何工作的呢?我想知道是否有任何文档可以阅读以更好地理解sparksql?
顺便说一句, `HiveContext().sql():SchemaRDD` 不再是了,分支大师的版本现在是 `HiveContext().sql():DataFrame` . 他们改变的太快了。
暂无答案!
目前还没有任何答案,快来回答吧!