将sql数据转换为json数组[java spark]

hec6srdp  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(751)

我有dataframe,想转换成json数组请看下面的例子
Dataframe

+------------+--------------------+----------+----------------+------------------+--------------
|     Name|                  id|request_id|create_timestamp|deadline_timestamp|
+------------+--------------------+----------+----------------+------------------+--------------
|    Freeform|59bbe3ad-f487-44| htvjiwmfe|   1589155200000|   1591272659556
|         D23|59bbe3ad-f487-44| htvjiwmfe|   1589155200000|   1591272659556
|      Stores|59bbe3ad-f487-44| htvjiwmfe|   1589155200000|   1591272659556
|VacationClub|59bbe3ad-f487-44| htvjiwmfe|   1589155200000|   1591272659556

需要json格式,如下所示:

[
   {
      "testname":"xyz",
      "systemResponse":[
         {
            "name":"FGH",
            "id":"59bbe3ad-f487-44",
            "request_id":1590791280,
            "create_timestamp":1590799280

         },
         {
           "name":"FGH",
            "id":"59bbe3ad-f487-44",
            "request_id":1590791280,
            "create_timestamp":1590799280,
         }
      ]
   }
]
lyr7nygr

lyr7nygr1#

您可以定义2个bean
从第一个df创建数组作为内部bean的数组
将testname和requestdetailarray定义为array的父bean
请同时查找代码内联注解

object DataToJsonArray {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Load you dataframe
    val requestDetailArray = List(
      ("Freeform", "59bbe3ad-f487-44", "htvjiwmfe", "1589155200000", "1591272659556"),
      ("D23", "59bbe3ad-f487-44", "htvjiwmfe", "1589155200000", "1591272659556"),
      ("Stores", "59bbe3ad-f487-44", "htvjiwmfe", "1589155200000", "1591272659556"),
      ("VacationClub", "59bbe3ad-f487-44", "htvjiwmfe", "1589155200000", "1591272659556")
    ).toDF
      //Map your Dataframe to RequestDetails bean
      .map(row => RequestDetails(row.getString(0), row.getString(1), row.getString(2), row.getString(3), row.getString(4)))
      //Collect it as Array
      .collect() 

    //Create another data frme with List[BaseClass] and set the (testname,Array[RequestDetails])
    List(BaseClass("xyz", requestDetailArray)).toDF()
      .write
      //Output your Dataframe as JSON
      .json("/json/output/path")
  }

}

case class RequestDetails(Name: String, id: String, request_id: String, create_timestamp: String, deadline_timestamp: String)

case class BaseClass(testname: String = "xyz", systemResponse: Array[RequestDetails])
z2acfund

z2acfund2#

检查以下代码。

import org.apache.spark.sql.functions._

df.withColumn("systemResponse",
     array(
           struct("id","request_id","create_timestamp","deadline_timestamp").as("data")
         )
)
.select("systemResponse")
.toJSON
.select(col("value").as("json_data"))
.show(false)

+-----------------------------------------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|{"systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
|{"systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
|{"systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
|{"systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------+

更新

scala> :paste
// Entering paste mode (ctrl-D to finish)

df.withColumn("systemResponse",
     array(
           struct("id","request_id","create_timestamp","deadline_timestamp").as("data")
         )
)
.withColumn("testname",lit("xyz"))
.select("testname","systemResponse")
.toJSON
.select(col("value").as("json_data"))
.show(false)

// Exiting paste mode, now interpreting.

+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"testname":"xyz","systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
|{"testname":"xyz","systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
|{"testname":"xyz","systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
|{"testname":"xyz","systemResponse":[{"id":"59bbe3ad-f487-44","request_id":"htvjiwmfe","create_timestamp":"1589155200000","deadline_timestamp":"1591272659556"}]}|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

相关问题