将pyspark df转换为所需的json格式

4ngedf3f  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(359)

我想将pysparkDataframe转换为指定的json格式。我希望在不转换为pythonDataframe的情况下实现所需的json结构。
输入Pypark df:

model                year               timestamp
0   i20    [2019, 2018, 2017]      2020-07-20 10:42:38.935
1   i10                [2017]      2020-07-20 10:42:38.935

基于pyspark df的必需json输出:(注:timestamp字段是常量字段,对于所有行都是相同的)

{
    "timestamp": 2020-07-20 10:42:38.935,
    "details": [{
        "model ": "i20",
        "year": ["2019, 2018, 2017"]
    }, {
        "model ": "i10",
        "year": ["2017"]
    }]
}

直到现在我都能做到

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

df.select(to_json(struct(*df.columns)).alias("json"))\
    .groupBy(spark_partition_id())\
    .agg(collect_list("json").alias("details"))\
    .select(col("details"))\
    .coalesce(1).write.option("quote", " ").mode('overwrite').json("JSON path to HDFS")

到目前为止,我取得的成果如下:

{"details":["{\"model\":\"i20\",\"year\":[\"2019\",\"2018\",\"2017\"],\"timestamp\":\"2020-07-20T10:39:57.829Z\"}"]}
{"details":["{\"model\":\"i10\",\"year\":[\"2017\"],\"timestamp\":\"2020-07-20T10:39:57.829Z\"}"]}

感谢您的帮助。谢谢

hjzp0vay

hjzp0vay1#

试试这个-

val df = spark.sql(
      """
        |select model, year, timestamp
        | from values
        | ('i20', array(2019, 2018, 2017), '2020-07-20 10:42:38.935'),
        |  ('i10', array(2017), '2020-07-20 10:42:38.935')
        |  T(model, year, timestamp)
      """.stripMargin)
    df.show(false)

    /**
      * +-----+------------------+-----------------------+
      * |model|year              |timestamp              |
      * +-----+------------------+-----------------------+
      * |i20  |[2019, 2018, 2017]|2020-07-20 10:42:38.935|
      * |i10  |[2017]            |2020-07-20 10:42:38.935|
      * +-----+------------------+-----------------------+
      */
    df.createOrReplaceTempView("table")
    spark.sql("select timestamp, collect_list(struct(model, year)) as details from table group by timestamp")
      .toJSON
      .show(false)

    /**
      * +-------------------------------------------------------------------------------------------------------------------------+
      * |value                                                                                                                    |
      * +-------------------------------------------------------------------------------------------------------------------------+
      * |{"timestamp":"2020-07-20 10:42:38.935","details":[{"model":"i20","year":[2019,2018,2017]},{"model":"i10","year":[2017]}]}|
      * +-------------------------------------------------------------------------------------------------------------------------+
      */
    df.groupBy("timestamp")
      .agg(collect_list(struct("model", "year")).as("details"))
      .toJSON
      .show(false)

    /**
      * +-------------------------------------------------------------------------------------------------------------------------+
      * |value                                                                                                                    |
      * +-------------------------------------------------------------------------------------------------------------------------+
      * |{"timestamp":"2020-07-20 10:42:38.935","details":[{"model":"i20","year":[2019,2018,2017]},{"model":"i10","year":[2017]}]}|
      * +-------------------------------------------------------------------------------------------------------------------------+
      */

相关问题