我有一个场景,需要基于另一个字段解析xml和json值。 Customer_Order
表有两个名为 response_id
以及 response_output
. response_output
将包含json字符串、xml字符串、error、blanks和nulls的组合。
我需要解决以下问题陈述
问题陈述
如果response_id=1和response_输出具有有效的json,则选择json逻辑
如果response_id=1并且response_输出没有有效的json,则为null
如果response_id=1,response_output为xml值,则为null
如果response\u id=1且response\u输出为error,则为error
如果response_id=1且response_输出为空或null,则为null
如果response\u id=2和response\u输出具有有效的json,则选择xml逻辑
如果response_id=2并且response_输出没有有效的xml,则为null
如果response_id=2,response_output为json值,则为null
如果response\u id=2且response\u输出为error,则为error
如果response_id=2且response_输出为空或null,则为null
当我试图用sparksql实现上面的问题语句时,当我遇到无效的xml或无效的json时,我的代码正在崩溃。
下面是错误,有人能帮我处理吗?
spark.sql("""select
customer_id,
response_id,
CASE WHEN (response_id=2 and response_output!="Error") THEN get_json_object(response_output, '$.Metrics.OrderResponseTime')
WHEN (response_id=1 and response_output!="Error") THEN xpath_string(response_output,'USR_ORD/OrderResponse/USR1OrderTotalTime')
WHEN ((response_id=1 or response_id=2) and response_output="Error") THEN "Error"
ELSE null END as order_time
from Customer_Order""").show()
下面是我在尝试上述查询时遇到的错误,如何处理无效的xml或json
Driver stacktrace:
21/02/05 00:48:06 INFO scheduler.DAGScheduler: Job 5 failed: show at Engine.scala:221, took 1.099890 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 10.0 failed 4 times, most recent failure: Lost task 3.3 in stage 10.0 (TID 83, srwredf2021.analytics1.test.dev.corp, executor 3): java.lang.RuntimeException: Invalid XML document: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 1234; XML document structures must start and end within the same entity.
<USR_ORD><OrderResult><ORDTime>2021-02-02 10:34:19</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult><OrderResponse></USR_ORD>
我的代码片段和数据供参考
数据列表
val custList = List((100,1,"<USR_ORD><OrderResult><ORDTime>2021-02-02 10:34:19</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult><OrderResponse></USR_ORD>"),
(200,1,"<USR_ORD><OrderResponse><OrderResult><ORDTime>2021-02-02 21:13:12</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>"),
(300,1,"Error"),
(400,1,""),
(500,1,"""{"OrderResponseTime":"2021-02-02 11:34:19", "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 }"""),
(600,2,"""{"OrderResponseTime":"2021-02-02 15:14:13", "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 }"""),
(700,2,"""{"OrderResponseTime":"2021-02-02 12:38:26", "OrderResponse":"COMPLETE", "OrderTime":200, "USR1Order":null "USR1Orderqut":4} """),
(800,2,"""{"OrderResponseTime":"2021-02-02 13:24:19", "OrderResponse":"COMPLETE", "OrderTime":100, "USR1Order":null} "USR1Orderqut":1}"""),
(900,2,"<USR_ORD><OrderResponse><OrderResult><ORDTime>2021-02-02 01:12:49</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>"),
(101,2,"Error"),
(202,2,""));
正在将列表加载到rdd
val rdd = spark.sparkContext.parallelize(custList)
强制架构列名
val DF1 = rdd.toDF("customer_id","response_id","response_output")
正在创建表
DF1.createOrReplaceTempView("Customer_Order")
打印架构
spark.sql("""select customer_id,response_id,response_output from Customer_Order""").printSchema()
root
|-- customer_id: integer (nullable = false)
|-- response_id: integer (nullable = false)
|-- response_output: string (nullable = true)
显示记录
spark.sql("""select customer_id,response_id,response_output from Customer_Order""").show()
+-----------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customer_id|response_id|response_output |
+-----------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|100 |1 |<USR_ORD><OrderResult><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult><OrderResponse></USR_ORD> |
|200 |1 |<USR_ORD><OrderResponse><OrderResult><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>|
|300 |1 |Error |
|400 |1 | |
|500 |1 |{ "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 } |
|600 |2 |{ "OrderResponse":"COMPLETE", "OrderTime":300, "USR1Order":null, "USR1Orderqut":10 } |
|700 |2 |{ "OrderResponse":"COMPLETE", "OrderTime":200, "USR1Order":null "USR1Orderqut":4} |
|800 |2 |{ "OrderResponse":"COMPLETE", "OrderTime":100, "USR1Order":null} "USR1Orderqut":1} |
|900 |2 |<USR_ORD><OrderResponse><OrderResult><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>|
|101 |2 |Error |
|202 |2 | |
+-----------+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1条答案
按热度按时间bvjveswy1#
对于json,您不需要将其验证为
get_json_object
如果路径不存在或json无效,则不会失败。为了避免从xml中提取值时出现异常,可以使用udf函数检查字符串
response_output
是否可以解析为xml:然后在sql查询中使用它:
现在你可以写你的情况下,逻辑。