我有一个表,有两列id,json_string,需要将json_string转换成mongodb文档格式。我正在从spark/scala向mongodb发送数据。
我尝试使用withcolumn,但仍然没有获得所需的格式。这是我到目前为止所拥有的,所以任何帮助都将非常感谢。
原始json字符串示例(df)
val df=spark.sql("select id, json_string from mytable")
{"id":"0001","json_string":"{\"header\": {\"column1\":\"value1\",\"column2\":\"value2\"},\"tail\": [{\"column3\":\"value3\",\"column4\":\"value4\",\"column5\":\"value5\"}]}"}
使用withcolumn(df2)可以得到:
val df2=df.withColumn("json_string",from_json(col("json_string"),MapType(StringType,StringType)))
{"id":"0001","json_string":{"header":"{\"column1\":\"value1\",\"column2\":\"value2\"}","tail":"[{\"column3\":\"value3\",\"column4\":\"value4\",\"column5\":\"value5\"}]"}}
所需格式:
{"id":{"$id":"0001"},"header":{"column1":"value1","column2":"value2"},"tail":[{"column3":"value3","column4":"value4","column5":"value5"}]}
所需格式图片示例
1条答案
按热度按时间kcrjzv8t1#
不需要手动定义模式,您可以动态地获取它并将其与一起使用
from_json
```val json_schema = spark.read.json(df.select("json_string").as[String]).schema
val df2 = df.withColumn("json_string", from_json(col("json_string"), json_schema))
.select("id", "json_string.*")
+----+----------------+--------------------------+
|id |header |tail |
+----+----------------+--------------------------+
|0001|{value1, value2}|[{value3, value4, value5}]|
+----+----------------+--------------------------+
root
|-- id: string (nullable = true)
|-- header: struct (nullable = true)
| |-- column1: string (nullable = true)
| |-- column2: string (nullable = true)
|-- tail: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- column3: string (nullable = true)
| | |-- column4: string (nullable = true)
| | |-- column5: string (nullable = true)