spark:为mergeschema字段选择默认值

fdx2calv  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(753)

我有一个Parquet地板,有一个像这样的旧模式:

| name | gender | age |
| Tom  | Male   | 30  |

当我们的模式更新到:

| name | gender | age | office |

我们在阅读旧Parquet地板时使用了mergeschema:

val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")

但在读取这些旧Parquet文件时,我得到了以下输出:

| name | gender | age | office |
| Tom  | Male   | 30  | null   |

这很正常。但是我想取一个默认值 office (例如“california”),当且仅当字段不存在于旧模式中时。有可能吗?

bwitn5fc

bwitn5fc1#

当列在某些Parquet文件中不存在,但在其他Parquet文件中存在时,您没有任何简单的方法来放置默认值
在Parquet文件格式中,每个Parquet文件都包含模式定义。默认情况下,读取parquet时,spark从parquet文件获取模式。唯一的影响 mergeSchema 选项是使用 mergeSchema spark将读取所有Parquet文件的所有模式并合并它们。
因此,在不修改Parquet文件的情况下,无法放置默认值。
另一种可能的方法是通过设置选项在读取parques时提供自己的模式 .schema() 就像这样:

spark.read.schema(StructType(Array(FieldType("name", StringType), ...)).parquet(...)

但在这种情况下,没有设置默认值的选项。
因此,剩下的唯一解决方案是手动添加列缺省值
如果我们有两个parquet,第一个包含旧模式的数据:

+----+------+---+
|name|gender|age|
+----+------+---+
|Tom |Male  |30 |
+----+------+---+

第二个包含新模式的数据:

+-----+------+---+------+
|name |gender|age|office|
+-----+------+---+------+
|Jane |Female|45 |Idaho |
|Roger|Male  |22 |null  |
+-----+------+---+------+

如果你不想更换所有的 null 值在“office”列中,可以使用 .na.fill 具体如下:

spark.read.option("mergeSchema", "true").parquet(path).na.fill("California", Array("office"))

结果如下:

+-----+------+---+----------+
|name |gender|age|office    |
+-----+------+---+----------+
|Jane |Female|45 |Idaho     |
|Roger|Male  |22 |California|
|Tom  |Male  |30 |California|
+-----+------+---+----------+

如果只希望旧数据获得默认值,则必须将每个Parquet文件读取到一个Dataframe,必要时添加具有默认值的列,并合并所有生成的Dataframe:

import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.util.CaseInsensitiveStringMap

ParquetTable("my_table",
  sparkSession = spark,
  options = CaseInsensitiveStringMap.empty(),
  paths = Seq(path),
  userSpecifiedSchema = None,
  fallbackFileFormat = classOf[ParquetFileFormat]
).fileIndex.allFiles().map(file => {
  val dataframe = spark.read.parquet(file.getPath.toString)

  if (dataframe.columns.contains("office")) {
    dataframe
  } else {
    dataframe.withColumn("office", lit("California"))
  }
}).reduce(_ unionByName _)

结果如下:

+-----+------+---+----------+
|name |gender|age|office    |
+-----+------+---+----------+
|Jane |Female|45 |Idaho     |
|Roger|Male  |22 |null      |
|Tom  |Male  |30 |California|
+-----+------+---+----------+

请注意,所有零件 ParquetTable([...].allFiles() 检索Parquet文件列表。如果您使用的是hadoop或本地文件系统,那么它可以简化。

相关问题