在spark scala中将Dataframe格式化为嵌套的json

cs7cruho  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(423)

我有一个Dataframedfu原始如下

我想把它转换成一个嵌套的json格式,如下所示

到目前为止我已经做到了

val df_original =data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val data1 = data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val resultDf = df_original.join(data1, Seq("unique_id")).dropDuplicates()

生成下面的json

{
  "unique_id": "12345678",
  "transaction_status": "posted",
  "amount": "116.26",
  "category": "Family",
  "email_id": "abcd@gmail.com",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": [
    {
      "acct_no": "51663",
      "ciskey": "47626220"
    },
    {
      "acct_no": "51663",
      "ciskey": "47626221"
    }, 
    {
      "acct_no": "51663",
      "ciskey": "47626222"
    }

  ]
}

Please help me to move forward
bqucvtff

bqucvtff1#

另一种选择-

加载测试数据

val data =
      """
        |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey
        |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626220
        |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626221
        |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626222
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()
    /**
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626220|
      * |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626221|
      * |posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626222|
      * +------------------+------+--------+--------------+---------+-------+--------+
      *
      * root
      * |-- transaction_status: string (nullable = true)
      * |-- amount: double (nullable = true)
      * |-- category: string (nullable = true)
      * |-- email_id: string (nullable = true)
      * |-- unique_id: integer (nullable = true)
      * |-- acct_no: integer (nullable = true)
      * |-- ciskey: integer (nullable = true)
      */

创建所需的json

val groupBy = df.columns.filter(_!="ciskey")
    df.groupBy(groupBy.map(col): _*).agg(collect_list($"ciskey").as("accounts"))
      .withColumn("ciskey", element_at($"accounts", 1) )
      .withColumn("customers", expr("TRANSFORM(accounts, " +
        "x -> named_struct('ciskey_no', x, 'ciskey_val', 'IND'))"))
      .withColumn("accounts",
        struct($"acct_no", $"customers"))
      .drop("customers")
      .toJSON
      .show(false)

    /**
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |value                                                                                                                                                                                                                                                                                                                          |
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"abcd@gmail.com","unique_id":12345678,"acct_no":51663,"accounts":{"acct_no":51663,"customers":[{"ciskey_no":47626220,"ciskey_val":"IND"},{"ciskey_no":47626221,"ciskey_val":"IND"},{"ciskey_no":47626222,"ciskey_val":"IND"}]},"ciskey":47626220}|
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      */

json文件-

{
    "transaction_status": "posted",
    "amount": 116.26,
    "category": "Family",
    "email_id": "abcd@gmail.com",
    "unique_id": 12345678,
    "acct_no": 51663,
    "accounts": {
        "acct_no": 51663,
        "customers": [{
            "ciskey_no": 47626220,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626221,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626222,
            "ciskey_val": "IND"
        }]
    },
    "ciskey": 47626220
}
qyswt5oh

qyswt5oh2#

检查以下代码。

scala> df.show(false)
+------------------+------+--------+--------------+---------+-------+--------+
|transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
+------------------+------+--------+--------------+---------+-------+--------+
|posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626220|
|posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626221|
|posted            |116.26|Family  |abcd@gmail.com|12345678 |51663  |47626222|
+------------------+------+--------+--------------+---------+-------+--------+
scala> 

df
.groupBy($"unique_id")
.agg(
    collect_set(
        struct(
            $"transaction_status",
            $"amount",
            $"category",
            $"email_id",
            $"unique_id",
            $"acct_no"
        )).as("json_data"),
    first($"ciskey").as("ciskey"),
    first("acct_no").as("acct_no"),
    collect_list(struct($"ciskey")).as("customers")
)
.withColumn("json_data",explode($"json_data"))
.withColumn("accounts",struct($"acct_no",$"customers"))
.select($"json_data.*",$"ciskey",$"accounts")
.toJSON
.show(false)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"abcd@gmail.com","unique_id":"12345678","acct_no":"51663","ciskey":"47626220","accounts":{"acct_no":"51663","customers":[{"ciskey":"47626220"},{"ciskey":"47626221"},{"ciskey":"47626222"}]}}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

上面的代码生成如下的数据,可能你可以在上面添加逻辑。

{
  "transaction_status": "posted",
  "amount": 116.26,
  "category": "Family",
  "email_id": "abcd@gmail.com",
  "unique_id": "12345678",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": {
    "acct_no": "51663",
    "customers": [
      {
        "ciskey": "47626220"
      },
      {
        "ciskey": "47626221"
      },
      {
        "ciskey": "47626222"
      }
    ]
  }
}

相关问题