如何在parquet分区中使用不同的模式

k2arahey  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(458)

我将json文件读入Dataframe。json可以有一个特定于名称的struct字段消息,如下所示。

  1. Json1
  2. {
  3. "ts":"2020-05-17T00:00:03Z",
  4. "name":"foo",
  5. "messages":[
  6. {
  7. "a":1810,
  8. "b":"hello",
  9. "c":390
  10. }
  11. ]
  12. }
  13. Json2
  14. {
  15. "ts":"2020-05-17T00:00:03Z",
  16. "name":"bar",
  17. "messages":[
  18. {
  19. "b":"my",
  20. "d":"world"
  21. }
  22. ]
  23. }

当我将json中的数据读入一个Dataframe时,我得到如下的模式。

  1. root
  2. |-- ts: string (nullable = true)
  3. |-- name: string (nullable = true)
  4. |-- messages: array (nullable = true)
  5. | |-- element: struct (containsNull = true)
  6. | | |-- a: long (nullable = true)
  7. | | |-- b: string (nullable = true)
  8. | | |-- c: long (nullable = true)
  9. | | |-- d: string (nullable = true)

这很好。现在,当我保存到按名称分区的parquet文件时,如何在foo和bar分区中有不同的模式?

  1. path/name=foo
  2. root
  3. |-- ts: string (nullable = true)
  4. |-- name: string (nullable = true)
  5. |-- messages: array (nullable = true)
  6. | |-- element: struct (containsNull = true)
  7. | | |-- a: long (nullable = true)
  8. | | |-- b: string (nullable = true)
  9. | | |-- c: long (nullable = true)
  10. path/name=bar
  11. root
  12. |-- ts: string (nullable = true)
  13. |-- name: string (nullable = true)
  14. |-- messages: array (nullable = true)
  15. | |-- element: struct (containsNull = true)
  16. | | |-- b: string (nullable = true)
  17. | | |-- d: string (nullable = true)

当我从根路径读取数据时,如果我得到所有foo和bar字段的schema,我就没事了。但是当我从path/name=foo读取数据时,我只期望foo模式。

5sxhfpxr

5sxhfpxr1#

1. Partitioning & Storing as Parquet file: 如果保存为Parquet格式,则在阅读时
path/name=foo specify the schema 包括所有必需的字段(a、b、c),然后spark只加载这些字段。
如果我们 won't 指定schema,那么所有字段(a、b、c、d)都将包含在Dataframe中 EX:schema=define structtype...schema spark.read.schema(schema).parquet(path/name=foo).printSchema()2.Partitioning & Storing as JSON/CSV file: 那么spark就不会在 path/name=foo 文件,所以当我们只读取name=foo目录时,我们不会得到 b,d 数据中包含的列。 EX: ```
spark.read.json(path/name=foo).printSchema()
spark.read.csv(path/name=foo).printSchema()

ffx8fchx

ffx8fchx2#

您可以在将Dataframe保存到分区之前更改模式,为此您必须过滤分区记录,然后将它们保存到相应的文件夹中

  1. # this will select only not null columns which will drop col d from foo and a,c from bar
  2. df = df.filter(f.col('name')='foo').select(*[c for c in df.columns if df.filter(f.col(c).isNotNull()).count() > 0])
  3. # then save the df
  4. df.write.json('path/name=foo')

现在每个分区将有不同的模式。

相关问题