基于count遍历xml并使用sparkscala创建arraystring

yebdmbv4  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(259)

通过遍历循环来解析xml,并使用循环创建字符串数组

<USR_ORD><OrderResponse>
<OrderCount1>3</OrderCount1>
<OrderResult><orders>
<order>
<name>A</name><address>AAA</address><number>A1</number><status></status>
</order>
<order>
<name>B</name><number>B1</number>
</order>
<order>
<name>C</name><address>CCC</address><number>C1</number><status></status>
</order>
</orders></OrderResult>
</OrderResponse></USR_ORD>

我的代码如下

//creating list
val myList=List((100,1,"<USR_ORD><OrderResponse><OrderCount1>3</OrderCount1><OrderResult><orders><order><name>A</name><address>AAA</address><number>A1</number><status></status></order><order><name>B</name><number>B1</number></order><order><name>C</name><address>CCC</address><number>C1</number><status></status></order></orders></OrderResult></OrderResponse></USR_ORD>"))

//creating dataframe and temp table
val rdd = spark.sparkContext.parallelize(myList);
val DF1 = rdd.toDF("customer_id","response_id","response_output")
DF1.createOrReplaceTempView("ord_tbl");

spark.sql("""select * from ord_tbl""").show(10,false)
+-----------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customer_id|response_id|response_output                                                                                                                                                                                                                                                                                                                                         |
+-----------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|100        |1          |<USR_ORD><OrderResponse><OrderCount1>3</OrderCount1><OrderResult><orders><order><name>A</name><address>AAA</address><number>A1</number><status></status></order><order><name>B</name><number>B1</number></order><order><name>C</name><address>CCC</address><number>C1</number><status></status></order></orders></OrderResult></OrderResponse></USR_ORD>|
+-----------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

使用xpath标准函数

spark.sql("""select xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/orders/order/name/text()') as name
      ,xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/orders/order/address/text()') as address
      , xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/orders/order/status/text()') as status from ord_tbl""").show(10,false)
+---------+----------+------+
|name     |address   |status|
+---------+----------+------+
|[A, B, C]|[AAA, CCC]|[]    |
+---------+----------+------+

但预期的Dataframe应该如下所示

+---------+----------+------+
|name     |address   |status|
+---------+----------+------+
|[A, B, C]|[AAA,,CCC]|[,,]  |
+---------+----------+------+

我尝试使用loop作为参考的东西,我肯定是错的,没有编译

def (inorders:Int,inOrderCount:int,partxpathstring1:String,partxpathstring2:String,)
val orders=inorders
val OrderCount=inOrderCount
var i=0
var j=0
for (i <- 1 to orders){
    for(j <- 1 to OrderCount){
    fullxpath=xpath_string(response_output,'$partxpathstring1+[i]+$partxpathstring2+[j]')
    fullxpath+=fullxpath
    )
j+=1
    }
i+1
}
5anewei6

5anewei61#

Spark xpath 函数似乎过滤xml节点中的空值。您可能需要使用自定义项来处理此问题。下面是一个使用 scala.xml.XML 分析列的步骤 response_output :

val parse_orders = udf((response: String) => {
  val xml = scala.xml.XML.loadString(response)

  val orderCount = (xml \\ "USR_ORD" \ "OrderResponse" \ "OrderCount1").text
  val orders = xml \\ "USR_ORD" \ "OrderResponse" \ "OrderResult" \ "orders" \ "order"

  val orderInfo = Seq("name", "address", "number", "status").map { node =>
    (node -> (orders \ node).map(_.text))
  }.toMap

  (orderCount, orderInfo)
})

val df1 = df.withColumn("parsed", parse_orders(col("response_output")))
  .select(
    col("customer_id"),
    col("response_id"),
    col("parsed._1").as("orderCount"),
    col("parsed._2.name").as("name"),
    col("parsed._2.address").as("address"),
    col("parsed._2.number").as("number"),
    col("parsed._2.status").as("status")
  )  

df1.show(false)
//+-----------+-----------+----------+---------+------------+------------+------+
//|customer_id|response_id|orderCount|name     |address     |number      |status|
//+-----------+-----------+----------+---------+------------+------------+------+
//|100        |1          |3         |[A, B, C]|[AAA, , CCC]|[A1, B1, C1]|[, , ]|
//+-----------+-----------+----------+---------+------------+------------+------+

相关问题