使用如下语句
val sql = s"""Select <column names> from <source> where <filter>"""
spark.sql(sql)
字符串
然后稍后运行查询,例如:
val filteredDF = df.filter(filterExp)
or
val selectDF = df.select(col1, col2, col3)
型
我在想:
1.是否在.sql()
命令中应用了 predicate 和投影下推的优化?
1.如果以后运行.filter()
或.select()
操作,这些操作是否会被推到读级别?
通过对.explain()
运行一些检查,似乎在(2.)中这些下推操作没有发生。为什么?
1条答案
按热度按时间3ks5zfa01#
简而言之,是的。Per Abdennacer的响应sql(“”)将sql解析为与使用API相同的指令(计划+表达式树)。进一步的过滤器或选择投影也会这样做。
然而,可以下推的内容是有限制的,特别是只有支持各种原语比较的文字才能下推。sources接口显示了源代码级别的可能性。
实际支持的操作取决于底层源。
字面量的限制可以通过我经历/工作过的几种方式表现出来:
第一个是Spark表达式或自定义表达式内部的东西,后者很容易在普通用户代码中触发:
字符串
as_uuid函数接受两个长整型并转换为字符串uuid,所以上面的过滤是在一个实际上不存在于底层_table中的字符串上进行的。解决这个问题需要编写一个自定义计划,该计划要足够早地运行,以便将过滤器转换为底层较低和较高收益率下推的过滤器。
在使用不同的过滤器/引擎时,术语之间也可能存在差异,在Databricks photon/delta下,您不会在计划中看到PredicatePushdown,而是类似于过滤器(我忘记了实际术语)。
要想知道为什么你的查询没有下推 predicate ,你需要指定使用的sql和源类型。
值得注意的是,即使下推是可能的,它也不会自动提供减速带-例如,如果没有为您的领域收集 parquet /三角形的统计数据,您最终还是会扫描。
(also值得注意的是,投影侧投影过滤器也用于减少从文件加载的内容,并且对于这意味着什么是源相关的,尽管对于列格式,在读取中可以简单地跳过未使用的数据)。