scala在spark中分解复杂的嵌套xml

643ylb08  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(269)

大家好,
我试图在spark中解析一个xml文件。我正在使用explode函数来展平数据。下面是输入、输出模式和代码。

Input Schema
   root
     |-- _no: string (nullable = true)
     |-- _sc double (nullable = true)
     |-- _xsi: string (nullable = true)
     |-- header: struct (nullable = true)
     |    |-- con: string (nullable = true)
     |    |-- co: string (nullable = true)
     |    |-- cr: date (nullable = true)
     |    |-- pe: string (nullable = true)
     |    |-- st: timestamp (nullable = true)
     |-- scs: struct (nullable = true)
     |    |-- _te: string (nullable = true)
     |    |-- scle: array (nullable = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- cId: long (nullable = true)
     |    |    |    |-- eId: long (nullable = true)
     |    |    |    |-- ent: array (nullable = true)
     |    |    |    |    |-- eent: struct (containsNull = true)
     |    |    |    |    |    |-- MSId: string (nullable = true)
     |    |    |    |    |    |-- _date: date (nullable = true)
     |    |    |    |    |    |-- uas: string (nullable = true)
     |    |    |    |    |    |-- tes: struct (nullable = true)
     |    |    |    |    |    |    |-- time: string (nullable = true)
     |    |    |    |    |    |-- tv: uct (nullable = true)
     |    |    |    |    |    |    |-- LUE: string (nullable = true)
     |    |    |    |    |    |    |-- _dour: string (nullable = true)
     |    |    |    |    |    |    |-- nete: string (nullable = true)
     |    |    |    |    |    |    |-- netSy: string (nullable = true)
     |    |    |    |    |    |    |-- parum: struct (nullable = true)
     |    |    |    |    |    |    |    |-- UE: long (nullable = true)
     |    |    |    |    |    |    |    |-- Parts: long (nullable = true)
     |    |    |    |    |    |    |-- sa: struct (nullable = true)
     |    |    |    |    |    |    |    |-- VA: boolean (nullable = true)
     |    |    |    |    |    |    |    |-- ng: string (nullable = true)
     |    |    |    |    |    |    |-- stitled: struct (nullable = true)
     |    |    |    |    |    |    |    |-- LUE: boolean (nullable = true)
     |    |    |    |    |    |    |    |-- ng: string (nullable = true)
     |    |    |    |    |    |    |-- tvering: struct (nullable = true)
     |    |    |    |    |    |    |    |-- dfLUE: string (nullable = true)
     |    |    |    |    |    |    |    |-- _body: string (nullable = true)
     |    |    |    |    |    |    |-- ubting: struct (nullable = true)
     |    |    |    |    |    |    |    |-- _LUE: string (nullable = true)
     |    |    |    |    |    |    |    |-- dy: string (nullable = true)

需要输出。

root
 |-- _no: string (nullable = true)
 |-- _sc double (nullable = true)
 |-- _xsi: string (nullable = true)
 |-- header: struct (nullable = true)
 |-- con: string (nullable = true)
 |-- co: string (nullable = true)
 |-- cr: date (nullable = true)
 |-- pe: string (nullable = true)
 |-- st: timestamp (nullable = true)
 |-- scs: struct (nullable = true)
 |-- _te: string (nullable = true)
 |-- scle: array (nullable = true)
 |-- element: struct (containsNull = true)
 |-- cId: long (nullable = true)
 |-- eId: long (nullable = true)
 |-- ent: array (nullable = true)
 |-- eent: struct (containsNull = true)
 |-- MSId: string (nullable = true)
 |-- _date: date (nullable = true)
 |-- uas: string (nullable = true)
 |-- tes: struct (nullable = true)
 |-- time: string (nullable = true)
 |-- tv: uct (nullable = true)
 |-- LUE: string (nullable = true)
 |-- _dour: string (nullable = true)
 |-- nete: string (nullable = true)
 |-- netSy: string (nullable = true)
 |-- parum: struct (nullable = true)
 |-- UE: long (nullable = true)
 |-- Parts: long (nullable = true)
 |-- sa: struct (nullable = true)
 |-- VA: boolean (nullable = true)
 |-- ng: string (nullable = true)
 |-- stitled: struct (nullable = true)
 |-- LUE: boolean (nullable = true)
 |-- ng: string (nullable = true)
 |-- tvering: struct (nullable = true)
 |-- dfLUE: string (nullable = true)
 |-- _body: string (nullable = true)
 |-- ubting: struct (nullable = true)
 |-- _LUE: string (nullable = true)
 |-- dy: string (nullable = true)

xml文件的大小为100mb,当我读取xml文件时,Dataframe的计数显示为1。我相信spark正在将整个xml文件读入一行。
用来爆炸的代码,

val readxml = spark.read.format("xml").option("rowTag","on")\
   .option("inferschema","true").load("/path")
val co= readxml.withColumn("cId", explode(col("scs.scle.cId")))
  .withColumn("eId", explode(col("scs.scle.schedule.eId")))
  .withColumn("exploded_sc", explode(col("scs.scle.ent")))
  .withColumn("uas", explode(col("exploded_sc.uas")))
  .withColumn("ag_dt", explode(col("exploded_sc._date")))
  .withColumn("time", explode(col("exploded_sc.tes.time")))
  .withColumn("MSId", explode(col("exploded_sc.MSId")))
  .withColumn("exploded_tv", explode(col("exploded_sc.tv")))
val finalDF = co.select(col("_sc"),col("header.*"),col("scs._te").as("_te"),
       col("cId"),col("eId"),                          
       col("MSId"),col("time"), col("exploded_tv._dour").as("_dour"),
       col("exploded_tv.tvering.dfLUE").as("tvra"),
       col("exploded_tv.tvering._body").as("body"),
       col("exploded_tv.parum.UE").as("pnum"),
       col("exploded_tv.parum.Parts").as("npart"),
       col("exploded_tv.ubting._LUE").as("tsting"),
       col("exploded_tv.ubting.dy").as("tsting_body"),
       col("exploded_tv.nete").as("netsrce"),
       col("exploded_tv.netSy").as("nettype"),
       col("exploded_tv.sa.VA").as("sp"),
       col("exploded_tv.sa.ng").as("lg"),
       col("exploded_tv.stitled.dfLUE").as("subled"),
       col("exploded_tv.stitled.ng").as("sud_ng"),
       col("uas"),col("ag_dt"))

我从上面的代码中得到所需的输出模式。但我无法查看finaldf的数据,我怀疑是因为数据太大。然后我在读取xml文件后开始计算每次爆炸的次数。然后我知道explode函数由于重复而以指数方式增加行数。是否有其他方法来实现上述输出。有人能帮忙吗。先谢谢你

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题