使用spark 2.3.2。
我试图使用dataframe的一些列的值,并将它们放入现有的json结构中。假设我有这个Dataframe:
val testDF = Seq(("""{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}""", "10", "1337")).toDF("key", "p", "o")
// used as key for nested json structure
val app = "appX"
基本上,我想从这个专栏
{
"foo": "bar",
"meta": {
"app1": {
"p": "2",
"o": "100"
},
"app2": {
"p": "5",
"o": "200"
}
}
}
对此:
{
"meta": {
"app1": {
"p": "2",
"o": "100"
},
"app2": {
"p": "5",
"o": "200"
},
"appX": {
"p": "10",
"o": "1337"
}
}
}
基于列 p
以及 o
Dataframe的。
我试过:
def process(inputDF: DataFrame, appName: String): DataFrame = {
val res = inputDF
.withColumn(appName, to_json(expr("(p, o)")))
.withColumn("meta", struct(get_json_object('key, "$.meta")))
.selectExpr(s"""struct(meta.*, ${appName} as ${appName}) as myStruct""")
.select(to_json('myStruct).as("newMeta"))
res.show(false)
res
}
val resultDF = process(testDF, app)
val resultString = resultDF.select("newMeta").collectAsList().get(0).getString(0)
StringContext.treatEscapes(resultString) must be ("""{"meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}""")
但是这个Assert不匹配,因为我不能
获取的内容 appX
与其他两个应用程序的级别相同
不知道如何正确处理引号,以及
不知道如何将“col1”重命名为“meta”。
测试失败:
Expected :"{"[meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}]}"
Actual :"{"[col1":"{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"}}","appX":"{"p":"10","o":"1337"}"]}"
1条答案
按热度按时间2ic8powd1#
提取
meta
内容转换
p
,o
列为map
数据类型。Map(lit(appx),结构($“p”,$“o”))然后使用
map_concat
函数来连接数据。检查以下代码。
创建
schema
转换string
至json
.打印架构
最终输出
Spark版本>=
2.4.0
与UDF
&案例类帮助。定义要保留的案例类
p
&o
列值定义自定义项到concatMap。
最终输出