我想将一个sql字符串作为用户输入,然后在执行之前对其进行转换。特别是,我想修改顶层投影(select子句),插入查询要检索的其他列。
我希望通过使用催化剂来实现这一点 sparkSession.experimental.extraOptimizations
. 我知道我所尝试的并不是严格意义上的优化(转换改变了sql语句的语义),但是api似乎仍然合适。但是,我的转换似乎被查询执行器忽略了。
下面是一个简单的例子来说明我遇到的问题。首先定义一个row case类:
case class TestRow(a: Int, b: Int, c: Int)
然后定义一个优化规则,简单地放弃任何投影:
object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case x: Project => x.child
}
}
现在创建一个数据集,注册优化,并运行sql查询:
// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)
// Register "optimisation".
sparkSession.experimental.extraOptimizations =
Seq(RemoveProjectOptimisationRule)
// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1")
// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)
以下是输出:
Query result:
[1]
== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
+- 'UnresolvedRelation `testtable`
== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
+- SubqueryAlias testtable
+- LocalRelation [a#3, b#4, c#5]
== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]
== Physical Plan ==
* Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]
我们看到结果与原始sql语句的结果相同,没有应用转换。然而,当打印逻辑和物理计划时,投影确实被移除了。我还确认(通过调试日志输出)转换确实正在被调用。
对这里发生的事有什么建议吗?也许乐观主义者只是忽略了改变语义的“优化”?
如果使用优化不是办法,有人能建议一个替代方案吗?我真正想做的就是解析输入sql语句,转换它,并将转换后的ast传递给spark执行。但据我所知,实现这一点的api是spark的私有部分 sql
包裹。也许可以使用反射,但我想避免这种情况。
任何指点都将不胜感激。
2条答案
按热度按时间ugmeyewa1#
正如您所猜测的,这是失败的,因为我们假设优化器不会更改查询结果。
具体地说,我们缓存从分析器中出来的模式(并且假设优化器没有更改它)。在将行转换为外部格式时,我们使用此模式,因此将截断结果中的列。如果您做的不仅仅是截断(即更改了数据类型),这甚至可能会崩溃。
正如你在这个笔记本中看到的,它实际上产生了你在封面下所期望的结果。我们计划在不久的将来打开更多的钩子,让您在查询执行的其他阶段修改计划。详见spark-18127。
uujelgoq2#
michaelarmbrust的回答证实了这种转变不应该通过优化来实现。
我改为在spark中使用内部api来实现我现在想要的转换。它需要spark中包专用的方法。因此,通过将相关逻辑放入适当的包中,我们可以无需反射地访问它们。概述:
请注意,这当然可能与spark的未来版本相冲突。