如何处理dataframe/sparksql/sparkscala中的无效xml字符串和无效json字符串

jw5wzhpr  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(421)

我有一个场景,需要基于另一个字段解析xml和json值。 Customer_Order 表有两个名为 response_id 以及 response_output . response_output 将包含json字符串、xml字符串、error、blanks和nulls的组合。
我需要解决以下问题陈述
问题陈述
如果response_id=1和response_输出具有有效的json,则选择json逻辑
如果response_id=1并且response_输出没有有效的json,则为null
如果response_id=1,response_output为xml值,则为null
如果response\u id=1且response\u输出为error,则为error
如果response_id=1且response_输出为空或null,则为null
如果response\u id=2和response\u输出具有有效的json,则选择xml逻辑
如果response_id=2并且response_输出没有有效的xml,则为null
如果response_id=2,response_output为json值,则为null
如果response\u id=2且response\u输出为error,则为error
如果response_id=2且response_输出为空或null,则为null
当我试图用sparksql实现上面的问题语句时,当我遇到无效的xml或无效的json时,我的代码正在崩溃。
下面是错误,有人能帮我处理吗?

spark.sql("""select 
    customer_id,
    response_id,
    CASE WHEN (response_id=2 and response_output!="Error") THEN get_json_object(response_output, '$.Metrics.OrderResponseTime')
         WHEN (response_id=1 and response_output!="Error") THEN xpath_string(response_output,'USR_ORD/OrderResponse/USR1OrderTotalTime')
         WHEN ((response_id=1 or response_id=2) and  response_output="Error") THEN "Error"
         ELSE null END as order_time 
         from Customer_Order""").show()

下面是我在尝试上述查询时遇到的错误,如何处理无效的xml或json

Driver stacktrace:
21/02/05 00:48:06 INFO scheduler.DAGScheduler: Job 5 failed: show at Engine.scala:221, took 1.099890 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 10.0 failed 4 times, most recent failure: Lost task 3.3 in stage 10.0 (TID 83, srwredf2021.analytics1.test.dev.corp, executor 3): java.lang.RuntimeException: Invalid XML document: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 1234; XML document structures must start and end within the same entity.
<USR_ORD><OrderResult><ORDTime>2021-02-02 10:34:19</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult><OrderResponse></USR_ORD>

我的代码片段和数据供参考
数据列表

val custList = List((100,1,"<USR_ORD><OrderResult><ORDTime>2021-02-02 10:34:19</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult><OrderResponse></USR_ORD>"),
(200,1,"<USR_ORD><OrderResponse><OrderResult><ORDTime>2021-02-02 21:13:12</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>"),
(300,1,"Error"),
(400,1,""),
(500,1,"""{"OrderResponseTime":"2021-02-02 11:34:19", "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 }"""),
(600,2,"""{"OrderResponseTime":"2021-02-02 15:14:13", "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 }"""),
(700,2,"""{"OrderResponseTime":"2021-02-02 12:38:26", "OrderResponse":"COMPLETE", "OrderTime":200, "USR1Order":null "USR1Orderqut":4} """),
(800,2,"""{"OrderResponseTime":"2021-02-02 13:24:19", "OrderResponse":"COMPLETE", "OrderTime":100, "USR1Order":null} "USR1Orderqut":1}"""),
(900,2,"<USR_ORD><OrderResponse><OrderResult><ORDTime>2021-02-02 01:12:49</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>"),
(101,2,"Error"),
(202,2,""));

正在将列表加载到rdd

val rdd = spark.sparkContext.parallelize(custList)

强制架构列名

val DF1 = rdd.toDF("customer_id","response_id","response_output")

正在创建表

DF1.createOrReplaceTempView("Customer_Order")

打印架构

spark.sql("""select customer_id,response_id,response_output from Customer_Order""").printSchema()

root
 |-- customer_id: integer (nullable = false)
 |-- response_id: integer (nullable = false)
 |-- response_output: string (nullable = true)

显示记录

spark.sql("""select customer_id,response_id,response_output from Customer_Order""").show()

+-----------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customer_id|response_id|response_output                                                                                                                                                                                                                                                                                                                          |
+-----------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|100        |1          |<USR_ORD><OrderResult><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult><OrderResponse></USR_ORD>                |
|200        |1          |<USR_ORD><OrderResponse><OrderResult><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>|
|300        |1          |Error                                                                                                                                                                                                                                                                                                                                    |
|400        |1          |                                                                                                                                                                                                                                                                                                                                         |
|500        |1          |{ "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 }                                                                                                                                                                                                                                                     |
|600        |2          |{ "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 }                                                                                                                                                                                                                                                     |
|700        |2          |{ "OrderResponse":"COMPLETE", "OrderTime":200, "USR1Order":null "USR1Orderqut":4}                                                                                                                                                                                                                                                        |
|800        |2          |{ "OrderResponse":"COMPLETE", "OrderTime":100, "USR1Order":null} "USR1Orderqut":1}                                                                                                                                                                                                                                                       |
|900        |2          |<USR_ORD><OrderResponse><OrderResult><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>|
|101        |2          |Error                                                                                                                                                                                                                                                                                                                                    |
|202        |2          |                                                                                                                                                                                                                                                                                                                                         |
+-----------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
bvjveswy

bvjveswy1#

对于json,您不需要将其验证为 get_json_object 如果路径不存在或json无效,则不会失败。
为了避免从xml中提取值时出现异常,可以使用udf函数检查字符串 response_output 是否可以解析为xml:

import scala.util.{Failure, Success, Try}

val isParsableXML = (xml: String) => {
  Try(scala.xml.XML.loadString(xml)) match {
    case Success(_) => true
    case Failure(_) => false
  }
}

spark.udf.register("is_parsable_xml", isParsableXML)

然后在sql查询中使用它:

spark.sql("""
SELECT customer_id,
       response_id,
       CASE
           WHEN (response_id=2
                 AND response_output!='Error') THEN get_json_object(response_output, '$.OrderTime')
           WHEN (response_id=1
                 AND response_output!='Error'
                 AND is_parsable_xml(response_output)) THEN xpath_string(response_output, 'USR_ORD/OrderResponse/USR1OrderTotalTime')
           WHEN ((response_id=1
                  OR response_id=2)
                 AND response_output='Error') THEN 'Error'
           ELSE NULL
       END AS order_time
FROM Customer_Order 
""").show()

//+-----------+-----------+----------+
//|customer_id|response_id|order_time|
//+-----------+-----------+----------+
//|        100|          1|      null|
//|        200|          1|          |
//|        300|          1|     Error|
//|        400|          1|      null|
//|        500|          1|      null|
//|        600|          2|       300|
//|        700|          2|      null|
//|        800|          2|       100|
//|        900|          2|      null|
//|        101|          2|     Error|
//|        202|          2|      null|
//+-----------+-----------+----------+

现在你可以写你的情况下,逻辑。

相关问题