scala—将xml列解析为多个列,并根据spark dataframe中的计数将其转换为行

mo49yndu  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(240)

我有一个场景,其中xml列 response_outputordercount 以及 orders 有相应的订单明细。
例如,xml如下所示 OrderCount 4岁以下 orders 我们有4个 order 细节

<USR_ORD><OrderResponse><OrderResult>
<OrderCount>4</OrderCount>
<ORDTime>2021-02-02 21:13:12</ORDTime><ORDStatus>COMPLETE</ORDStatus>
<ORDValue>
<USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc>
<orders>
<order><name>MR RITA SOMA</name><address>606 JAL TXS</address><tracknumber>7825225</tracknumber><status>UNK</status></order>
<order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825226</tracknumber><status>FAIL</status></order>
<order><name>MR RODREX SOMA</name><address>18, GHC,BAN</address><tracknumber>7825224</tracknumber><status>SUC</status></order>
<order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825223</tracknumber><status>SUC</status></order>
</orders>
<USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD>
</ORDValue>
</OrderResult></OrderResponse></USR_ORD>

我需要根据 ordercount ,如果 ordercount 如果是4,那么我需要迭代4次 orders 取4条记录 order 细节以及 ordercount 如果是1,那么我需要用 order 详细信息。
有人能帮我用spark2,scala解决方案吗?
源数据:


|customer_id|response_id|response_output|
+-----------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|100        |1          |<USR_ORD><OrderResponse><OrderResult><OrderCount>1</OrderCount><ORDTime>2021-02-02 10:34:19</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><orders><order><name>MRS MITA PERS</name><address>17 MAXI RD CHN</address><tracknumber>7825222</tracknumber><status>FAIL</status><amount>4500</amount><orderdate>2019-10-18</orderdate></order></orders><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse|
|200        |1          |<USR_ORD><OrderResponse><OrderResult><OrderCount>4</OrderCount><ORDTime>2021-02-02 21:13:12</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><orders><order><name>MR RITA SOMA</name><address>606 JAL TXS</address><tracknumber>7825225</tracknumber><status>UNK</status><amount>1030</amount><orderdate>2020-11-16</orderdate></order><order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825226</tracknumber><status>FAIL</status><amount>8000</amount><orderdate>2018-07-17</orderdate></order><order><name>MR RODREX SOMA</name><address>18, GHC, BAN</address><tracknumber>7825224</tracknumber><status>SUC</status><amount>2500</amount><orderdate>2017-09-16</orderdate></order><order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825223</tracknumber><status>SUC</status><amount>2700</amount><orderdate>2017-04-22</orderdate></order></orders><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>|


当我尝试运行下面的sql时,我得到如下结果,但是我需要为customer\u id 200获取4条记录,因为计数是4,并且有相应的订单细节。

spark.sql("""select
     |     customer_id,
     |     xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/OrderCount') as OrderCount,
     |     xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/name') as name,
     | xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/address') as address,
     | xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/tracknumber') as tracknumber,
     | xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/status') as status
     |          from cust_tbl""").show()

我得到的结果是:

+-----------+----------+-------------+--------------+-----------+------+
|customer_id|OrderCount|         name|       address|tracknumber|status|
+-----------+----------+-------------+--------------+-----------+------+
|        100|         1|MRS MITA PERS|17 MAXI RD CHN|    7825222|  FAIL|
|        200|         4| MR RITA SOMA|   606 JAL TXS|    7825225|   UNK|
+-----------+----------+-------------+--------------+-----------+------+

期望输出:

+-----------+----------+------------+-----------+-----------+------+
|customer_id|OrderCount|name        |address    |tracknumber|status|
+-----------+----------+------------+-----------+-----------+------+
|200        |4         |MRRITASOMA  |606JALTXS  |7825225    |UNK   |
|200        |4         |MRRITASOMA  |1BAL HAL   |7825226    |FAIL  |
|200        |4         |MRRODREXSOMA|18 GHC BAN |7825224    |SUC   |
|200        |4         |MRRITASOMA  |1 BAL HAL  |7825223    |SUC   |
|100        |1         |MRSMITAPERS |17MAXIRDCHN|7825222    |FAIL  |
+-----------+----------+------------+-----------+-----------+------+
pwuypxnk

pwuypxnk1#

函数 xpath_string 为给定的xpath表达式提取一个字符串值。对于您的情况,您需要使用 xpath 获取每个订单详细信息的节点值数组( name , status ,…)并使用 arrays_zip :

val df1 = df.withColumn(
    "OrderCount",
    expr("xpath_string(response_output, 'USR_ORD/OrderResponse/OrderResult/OrderCount')")
).withColumn(
    "orders",
    explode(
        arrays_zip(
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/name/text()')"),
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/address/text()')"),
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/tracknumber/text()')"),
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/status/text()')")
        ).cast("array<struct<name:string,address:string,tracknumber:string,status:string>>")
    )
).select("customer_id", "OrderCount", "orders.*")

df1.show(false)

//+-----------+----------+--------------+--------------+-----------+------+
//|customer_id|OrderCount|name          |address       |tracknumber|status|
//+-----------+----------+--------------+--------------+-----------+------+
//|100        |1         |MRS MITA PERS |17 MAXI RD CHN|7825222    |FAIL  |
//|200        |4         |MR RITA SOMA  |606 JAL TXS   |7825225    |UNK   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825226    |FAIL  |
//|200        |4         |MR RODREX SOMA|18, GHC, BAN  |7825224    |SUC   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825223    |SUC   |
//+-----------+----------+--------------+--------------+-----------+------+

更新

对于Spark<2.4,您可以 posexplode 每个数组列和索引上的联接:

val df1 = df.withColumn(
    "OrderCount",
    expr("xpath_string(response_output, 'USR_ORD/OrderResponse/OrderResult/OrderCount')")
  ).select(  
    col("customer_id"),
    col("OrderCount"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/name/text()')").as("name"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/address/text()')").as("address"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/tracknumber/text()')").as("tracknumber"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/status/text()')").as("status")
  )

val result = df1.selectExpr("customer_id", "OrderCount", "posexplode(name) as (idx, name)")
  .join(
    df1.selectExpr("customer_id", "posexplode(address) as (idx, address)"),
    Seq("idx", "customer_id")
  ).join(
    df1.selectExpr("customer_id","posexplode(tracknumber) as (idx, tracknumber)"),
    Seq("idx", "customer_id")
  ).join(
    df1.selectExpr("customer_id", "posexplode(status) as (idx, status)"),
    Seq("idx", "customer_id")
  ).drop("idx")

result.show(false)

//+-----------+----------+--------------+--------------+-----------+------+
//|customer_id|OrderCount|name          |address       |tracknumber|status|
//+-----------+----------+--------------+--------------+-----------+------+
//|100        |1         |MRS MITA PERS |17 MAXI RD CHN|7825222    |FAIL  |
//|200        |4         |MR RITA SOMA  |606 JAL TXS   |7825225    |UNK   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825226    |FAIL  |
//|200        |4         |MR RODREX SOMA|18, GHC, BAN  |7825224    |SUC   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825223    |SUC   |
//+-----------+----------+--------------+--------------+-----------+------+

相关问题