将值添加到spark dataframe列中现有的嵌套json中

8zzbczxx  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(561)

使用spark 2.3.2。
我试图使用dataframe的一些列的值,并将它们放入现有的json结构中。假设我有这个Dataframe:

val testDF = Seq(("""{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}""", "10", "1337")).toDF("key", "p", "o")

// used as key for nested json structure
val app = "appX"

基本上,我想从这个专栏

{
  "foo": "bar",
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    }
  }
}

对此:

{
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    },
    "appX": {
      "p": "10",
      "o": "1337"
    }
  }
}

基于列 p 以及 o Dataframe的。
我试过:

def process(inputDF: DataFrame, appName: String): DataFrame = {
  val res = inputDF
    .withColumn(appName, to_json(expr("(p, o)")))
    .withColumn("meta", struct(get_json_object('key, "$.meta")))
    .selectExpr(s"""struct(meta.*, ${appName} as ${appName}) as myStruct""")
    .select(to_json('myStruct).as("newMeta"))

  res.show(false)
  res
}

val resultDF = process(testDF, app)

val resultString = resultDF.select("newMeta").collectAsList().get(0).getString(0)

StringContext.treatEscapes(resultString) must be ("""{"meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}""")

但是这个Assert不匹配,因为我不能
获取的内容 appX 与其他两个应用程序的级别相同
不知道如何正确处理引号,以及
不知道如何将“col1”重命名为“meta”。
测试失败:

Expected :"{"[meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}]}"
Actual   :"{"[col1":"{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"}}","appX":"{"p":"10","o":"1337"}"]}"
2ic8powd

2ic8powd1#

提取 meta 内容
转换 p , o 列为 map 数据类型。Map(lit(appx),结构($“p”,$“o”))
然后使用 map_concat 函数来连接数据。
检查以下代码。

scala> testDF.show(false)
+---------------------------------------------------------------------------------+---+----+
|key                                                                              |p  |o   |
+---------------------------------------------------------------------------------+---+----+
|{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}|10 |1337|
+---------------------------------------------------------------------------------+---+----+

创建 schema 转换 stringjson .

scala> val schema = new StructType().add("foo",StringType).add("meta",MapType(StringType,new StructType().add("p",StringType).add("o",StringType)))

打印架构

scala> schema.printTreeString
root
 |-- foo: string (nullable = true)
 |-- meta: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- p: string (nullable = true)
 |    |    |-- o: string (nullable = true)
val appX = "appX"

testDF
.withColumn("key",from_json($"key",schema)) // convert json string to json using predefined schema.
.withColumn(
    "key",
    struct(
        $"key.foo", // foo value from key column.
        map_concat(
            $"key.meta", // extracting meta from key column.
            map(
                lit(appX), // Constant appX value
                struct($"p",$"o") // wrapping p, o values into struct.
            ) // converting appX,p,o into map(appX -> (p,o))
        )
        .as("meta") // giving alias to match existing meta in key.
    ) // using struct to combine foo, meta columns.
)
.select(to_json(struct($"key")).as("json_data")) // converting key value into json format.
.show(false)

最终输出

+-----------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------+
|{"key":{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}}|
+-----------------------------------------------------------------------------------------------------------------+

Spark版本>= 2.4.0UDF &案例类帮助。
定义要保留的案例类 p & o 列值

scala> case class PO(p:String,o:String)

定义自定义项到concatMap。

scala> val map_concat = udf((mp:Map[String,PO],mpa:Map[String,PO]) => mp ++ mpa)
scala> df
.withColumn("key",from_json($"key",schema))
.withColumn(
    "key",
    to_json(
        struct(
            $"key.foo",
            map_concat(
                $"key.meta",
                map(
                    lit(app),
                    struct($"p",$"o")
                )
            ).as("meta")
        )
    )
)
.show(false)

最终输出

+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|key                                        |p  |o   |newMap                                                                                                   |
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|[bar,Map(app1 -> [2,100], app2 -> [5,200])]|10 |1337|{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}|
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+

相关问题