将json字符串列拆分为多个列

wh6knrhe  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(491)

我正在寻找一个通用的解决方案,从json字符串列中将所有json字段提取为列。

  1. df = spark.read.load(path)
  2. df.show()

“path”中文件的文件格式是parquet
样本数据

  1. |id | json_data
  2. | 1 | {"name":"abc", "depts":["dep01", "dep02"]}
  3. | 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
  4. | 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}

预期产量

  1. |id | name | depts | sal | address_city | address_state
  2. | 1 | "abc" | ["dep01", "dep02"] | null| null | null
  3. | 2 | "xyz" | ["dep03"] | 100 | null | null
  4. | 3 | "pqr" | ["dep02"] | null| "SF" | "CA"

我知道我可以通过创建一个定义了模式的structtype并使用'from_json'方法来提取列。
但这种方法需要手动定义模式。

  1. val myStruct = StructType(
  2. Seq(
  3. StructField("name", StringType),
  4. StructField("depts", ArrayType(StringType)),
  5. StructField("sal", IntegerType)
  6. ))
  7. var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))

有没有更好的方法可以在不手动定义模式的情况下展平json列?在提供的示例中,我可以看到可用的json字段。但实际上,我无法遍历所有行来查找所有字段。
因此,我正在寻找一种解决方案,将所有字段拆分为列,而不指定列的名称或类型。

2nbm6dog

2nbm6dog1#

假设 json_data 属于类型 map (您始终可以将其转换为 map 如果不是),你可以用 getItem :

  1. df = spark.createDataFrame([
  2. [1, {"name": "abc", "depts": ["dep01", "dep02"]}],
  3. [2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]
  4. ],
  5. ['id', 'json_data']
  6. )
  7. df.select(
  8. df.id,
  9. df.json_data.getItem('name').alias('name'),
  10. df.json_data.getItem('depts').alias('depts'),
  11. df.json_data.getItem('sal').alias('sal')
  12. ).show()
  13. +---+----+--------------+----+
  14. | id|name| depts| sal|
  15. +---+----+--------------+----+
  16. | 1| abc|[dep01, dep02]|null|
  17. | 2| xyz| [dep03]| 100|
  18. +---+----+--------------+----+

更具动态性的列提取方法:

  1. cols = ['name', 'depts', 'sal']
  2. df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()
展开查看全部
pn9klfpd

pn9klfpd2#

基于@gaurang shah的回答,我实现了一个处理嵌套json结构的解决方案,并修复了使用单调递增的id(非顺序)的问题
在这种方法中,“populatecolumnname”函数递归地检查structtype列并填充列名。
“renamecolumns”函数通过将“.”替换为“\”来重命名列,以标识嵌套的json字段。
“addindex”函数在解析json列后向dataframe添加索引以加入dataframe。

  1. def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {
  2. val indexCol = "internal_temp_id"
  3. def populateColumnName(col : StructField) : Array[String] = {
  4. col.dataType match {
  5. case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
  6. case rest => Array(col.name)
  7. }
  8. }
  9. def renameColumns(name : String) : String = {
  10. if(name contains ".") {
  11. name + " as " + name.replaceAll("\\.", "_")
  12. }
  13. else name
  14. }
  15. def addIndex(df : DataFrame) : DataFrame = {
  16. // Append "rowid" column of type Long
  17. val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))
  18. // Zip on RDD level
  19. val rddWithId = df.rdd.zipWithIndex
  20. // Convert back to DataFrame
  21. spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
  22. }
  23. val dfWithID = addIndex(df)
  24. val jsonDF = df.select(columnName)
  25. val ds = jsonDF.rdd.map(_.getString(0)).toDS
  26. val parseDF = spark.read.option("inferSchema",true).json(ds)
  27. val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)
  28. var resultDF = parseDF.selectExpr(columnNames:_*)
  29. val jsonDFWithID = addIndex(resultDF)
  30. val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)
  31. joinDF
  32. }
  33. val res = flattenJSON(jsonDF, "address")
展开查看全部
sd2nnvve

sd2nnvve3#

如果它是一个 CSV 只有一列作为 JSON 数据。您可以使用以下解决方案。

  1. val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")
  2. val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))
  3. val ds = rdd.toDS
  4. val jsonDF = spark.read.json(ds)
  5. val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())
  6. val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())
  7. val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")

这就是最终Dataframe的样子。

  1. scala> joinDF.printSchema()
  2. root
  3. |-- address: struct (nullable = true)
  4. | |-- city: string (nullable = true)
  5. | |-- state: string (nullable = true)
  6. |-- depts: array (nullable = true)
  7. | |-- element: string (containsNull = true)
  8. |-- name: string (nullable = true)
  9. |-- sal: long (nullable = true)
  10. |-- id : double (nullable = true)

如果它是一个 JSON 文件。为了我。 inferSchema 很好用。
json文件

  1. ~/Downloads cat test.json
  2. {"id": 1, "name":"abc", "depts":["dep01", "dep02"]},
  3. {"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}

代码

  1. scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()
  2. +--------------+---+----+----+
  3. | depts| id|name| sal|
  4. +--------------+---+----+----+
  5. |[dep01, dep02]| 1| abc|null|
  6. | [dep03]| 2| xyz| 100|
  7. +--------------+---+----+----+
展开查看全部

相关问题