使用sparkscala解析列中的json根

lh80um4z  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(500)

为了将jsom的根转换为Dataframe中记录的数量未知的记录,我遇到了一些问题。
我使用类似于以下内容的json生成了一个Dataframe:

val exampleJson = spark.createDataset(
  """
  {"ITEM1512":
        {"name":"Yin",
         "address":{"city":"Columbus",
                    "state":"Ohio"}
                    }, 
    "ITEM1518":
        {"name":"Yang",
         "address":{"city":"Working",
                    "state":"Marc"}
                    }
  }""" :: Nil)

当我按照下面的说明读的时候

val itemsExample = spark.read.json(exampleJson)

生成的架构和Dataframe如下所示:

+-----------------------+-----------------------+
|ITEM1512               |ITEM1518               |
+-----------------------+-----------------------+
|[[Columbus, Ohio], Yin]|[[Working, Marc], Yang]|
+-----------------------+-----------------------+

root
 |-- ITEM1512: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1518: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)

但我想产生这样的结果:

+-----------------------+-----------------------+
|Item                   |Values                 |
+-----------------------+-----------------------+
|ITEM1512               |[[Columbus, Ohio], Yin]|
|ITEM1518               |[[Working, Marc], Yang]|
+-----------------------+-----------------------+

因此,为了解析这个json数据,我需要读取所有的列并将其添加到Dataframe中的一个记录中,因为我编写的示例不止这两项。事实上,我想在一个数据框中添加数百万项。
我试图复制中的解决方案:how to parse the json data using spark scala with this code:

val columns:Array[String]       = itemsExample.columns
var arrayOfDFs:Array[DataFrame] = Array() 

for(col_name <- columns){

  val temp = itemsExample.selectExpr("explode("+col_name+") as element")
    .select(
      lit(col_name).as("Item"),
      col("element.E").as("Value"))

  arrayOfDFs = arrayOfDFs :+ temp
}

val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)

但是我面临这个问题,而在阅读另一个问题的例子中,根在数组中,在我的例子中,根是一个结构类型。因此,将引发下一个异常:
org.apache.spark.sql.analysisexception:无法解析“explode”( ITEM1512 )'由于数据类型不匹配:函数explode的输入应为数组或Map类型,而不是struct,name:string>

wqnecbli

wqnecbli1#

你可以用 stack 功能。 Example: ```
itemsExample.selectExpr("""stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)""").
show(false)
//+--------+-----------------------+
//|Item |Values |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+

更新: `Dynamic Stack query:` ```
val stack=df.columns.map(x => s"'${x}',${x}").mkString(s"stack(${df.columns.size},",",",")as (Item,Values)")
//stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)

itemsExample.selectExpr(stack).show()
//+--------+-----------------------+
//|Item    |Values                 |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+

相关问题