我想将pysparkDataframe转换为指定的json格式。我希望在不转换为pythonDataframe的情况下实现所需的json结构。
输入Pypark df:
model year timestamp
0 i20 [2019, 2018, 2017] 2020-07-20 10:42:38.935
1 i10 [2017] 2020-07-20 10:42:38.935
基于pyspark df的必需json输出:(注:timestamp字段是常量字段,对于所有行都是相同的)
{
"timestamp": 2020-07-20 10:42:38.935,
"details": [{
"model ": "i20",
"year": ["2019, 2018, 2017"]
}, {
"model ": "i10",
"year": ["2017"]
}]
}
直到现在我都能做到
from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct
df.select(to_json(struct(*df.columns)).alias("json"))\
.groupBy(spark_partition_id())\
.agg(collect_list("json").alias("details"))\
.select(col("details"))\
.coalesce(1).write.option("quote", " ").mode('overwrite').json("JSON path to HDFS")
到目前为止,我取得的成果如下:
{"details":["{\"model\":\"i20\",\"year\":[\"2019\",\"2018\",\"2017\"],\"timestamp\":\"2020-07-20T10:39:57.829Z\"}"]}
{"details":["{\"model\":\"i10\",\"year\":[\"2017\"],\"timestamp\":\"2020-07-20T10:39:57.829Z\"}"]}
感谢您的帮助。谢谢
1条答案
按热度按时间hjzp0vay1#
试试这个-