优化spark scala中的数组

mlnl4t2r  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(316)

我有一个20 gb的xml文件,其中包含嵌套的xml。我正在根据标记(xml中的信息,总共有36种类型)将xml分为多个。我成功地使用了下面的代码段,但问题是,它占用了太多时间。如何优化下面的代码?

case class Tags(name: String, tagNameList: List[String], tagCodeList: List[String])
val df: DataFrame = spark.read
      .format("com.databricks.spark.xml")
      .option("rootTag", "List")
      .option("rowTag", "Coral")
      .load("test.xml").coalesce(5)

val m_incorrect_tags_df: DataFrame = spark.read.csv("incorrect.csv")
val m_incorrectTags_df: DataFrame = m_incorrect_tags_df.withColumnRenamed("_c0", "SourceId")
val commondf: DataFrame = m_df.as("df1").join(m_incorrectTags_df.as("df2"), $"SourceId" <=> $"Identity.SourceId")
val m_commonDf: DataFrame = commondf.withColumnRenamed("_c1", "TagCode")

val tags = new ArrayList[Tags]()
tags.add(tags("TT", List("TT", "Te1", "Test1"), List("TT", "TT1", "Test")))
tags.add(tags("HP", List("HP", "HP123", "HIM"), List("HPS")))
...//total 36 tags

tags.forEach(tag => {
  val wheretagName =
    tag.tagNameList.map(tagName => s"array_contains(flatten(LocationList.Location.Address.ParsedList.Parsed).Admin.AdminLevel.Level2, '$tagName')").mkString(" OR ")

  val wheretagCode =
    tag.tagCodeList.map(tagCode => s"array_contains(flatten(LocationList.Location.AdditionalData)._VALUE, '$tagCode')").mkString(" OR ")

  val tagDataFrame = df.select(col("*"))
    .where(wheretagName + " OR (array_contains(flatten(LocationList.Location.AdditionalData)._key, 'tag') AND (" + wheretagCode + "))")

  val tagDfWithIncorrectSource = m_commonDf.where($"TagCode".isInCollection(tag.tagCodeList)).select($"df1.*")

  val tagAllDf = tagDataFrame.union(tagDfWithIncorrectSource)
  tagAllDf.write
      .option("rootTag", "PlaceList")
      .option("rowTag", "Place")
      .xml(tempFilePath)
})

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题