我有一个20 gb的xml文件,其中包含嵌套的xml。我正在根据标记(xml中的信息,总共有36种类型)将xml分为多个。我成功地使用了下面的代码段,但问题是,它占用了太多时间。如何优化下面的代码?
case class Tags(name: String, tagNameList: List[String], tagCodeList: List[String])
val df: DataFrame = spark.read
.format("com.databricks.spark.xml")
.option("rootTag", "List")
.option("rowTag", "Coral")
.load("test.xml").coalesce(5)
val m_incorrect_tags_df: DataFrame = spark.read.csv("incorrect.csv")
val m_incorrectTags_df: DataFrame = m_incorrect_tags_df.withColumnRenamed("_c0", "SourceId")
val commondf: DataFrame = m_df.as("df1").join(m_incorrectTags_df.as("df2"), $"SourceId" <=> $"Identity.SourceId")
val m_commonDf: DataFrame = commondf.withColumnRenamed("_c1", "TagCode")
val tags = new ArrayList[Tags]()
tags.add(tags("TT", List("TT", "Te1", "Test1"), List("TT", "TT1", "Test")))
tags.add(tags("HP", List("HP", "HP123", "HIM"), List("HPS")))
...//total 36 tags
tags.forEach(tag => {
val wheretagName =
tag.tagNameList.map(tagName => s"array_contains(flatten(LocationList.Location.Address.ParsedList.Parsed).Admin.AdminLevel.Level2, '$tagName')").mkString(" OR ")
val wheretagCode =
tag.tagCodeList.map(tagCode => s"array_contains(flatten(LocationList.Location.AdditionalData)._VALUE, '$tagCode')").mkString(" OR ")
val tagDataFrame = df.select(col("*"))
.where(wheretagName + " OR (array_contains(flatten(LocationList.Location.AdditionalData)._key, 'tag') AND (" + wheretagCode + "))")
val tagDfWithIncorrectSource = m_commonDf.where($"TagCode".isInCollection(tag.tagCodeList)).select($"df1.*")
val tagAllDf = tagDataFrame.union(tagDfWithIncorrectSource)
tagAllDf.write
.option("rootTag", "PlaceList")
.option("rowTag", "Place")
.xml(tempFilePath)
})
暂无答案!
目前还没有任何答案,快来回答吧!