在我讨论这个问题之前,让我先谈谈问题背景:我们目前使用的spark版本是2.2,我们计划在不久的将来迁移到spark3.0。在迁移之前,我们在spark2.2和spark3.0中测试一些查询,以检查潜在的问题。这些查询的数据源表是spark2.2编写的orc格式。
spark3.0默认使用本机读取器读取orc文件(使用2个标志来启用此功能:set spark.sql.hive.convertmatastoreorc=true,set spark.sql.orc.impl=native),我发现即使应用了列修剪,spark3.0的本机读取器也将读取所有列。这将降低读取数据的速度。
e、 g.查询:从表a中选择列a;
表a有100列,而此查询只有1列是只读的。列剪枝在物理规划中的应用。但是filescanrd将读取所有100列。
然后我做远程调试。在orcutils.scala的requestedcolumnids方法中,它将检查字段名是否以“\u col”开头。在我的例子中,字段名以“\u col”开头,比如“\u col1”、“\u col2”。所以李子考还没完。然后在下面的代码逻辑中读取所有列。此代码如下:
def requestedColumnIds(
isCaseSensitive: Boolean,
dataSchema: StructType,
requiredSchema: StructType,
reader: Reader,
conf: Configuration): Option[(Array[Int], Boolean)] = {
...
if (orcFieldNames.forall(_.startsWith("_col"))) {
// This is a ORC file written by Hive, no field names in the physical schema, assume the
// physical schema maps to the data scheme by index.
assert(orcFieldNames.length <= dataSchema.length, "The given data schema " +
s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " +
"no idea which columns were dropped, fail to read.")
// for ORC file written by Hive, no field names
// in the physical schema, there is a need to send the
// entire dataSchema instead of required schema.
// So pruneCols is not done in this case
Some(requiredSchema.fieldNames.map { name =>
val index = dataSchema.fieldIndex(name)
if (index < orcFieldNames.length) {
index
} else {
-1
}
}, false)
...
}
这个代码注解是否意味着spark的本机reader不支持orc旧格式的列修剪,即哪种模式是列索引而不是实际的列名?
切换回配置单元读取器可以通过设置标志来解决此问题:set spark.sql.hive.convertmatastoreorc=false,set spark.sql.orc.impl=hive。但这不是一个好主意,因为本地阅读器提供了Hive阅读器所没有的优化,例如矢量化阅读。
这是一种使用本机reader而不存在列修剪问题的方法吗?
如果您能提供帮助或建议,谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!