scala—如何将存储在Dataframe上的更改应用于另一个Dataframe?

b09cbbtk  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(538)

我的基本Dataframe如下所示:heronamesdf

id gen name  surname supername
1  1   Clarc Kent    BATMAN
2  1   Bruce Smith   BATMAN
3  2   Clark Kent    SUPERMAN

然后我有另一个和更正:correctionsdf

id gen attribute value
1  1   supername SUPERMAN
1  1   name      Clark
2  1   surname   Wayne

我解决这个问题的办法是这样做

CorrectionsDF.select(id, gen).distinct().collect().map(r => {
  val id = r(0)
  val gen = r(1)

  val corrections = CorrectionsDF.filter(col("id") === lit(id) and col("gen") === lit(gen))
  val candidates  = HeroNamesDF.filter(col("id") === lit(id) and col("gen") === lit(gen))

  candidates.columns.map(column => {
    val change = corrections.where(col("attribute") === lit(column)).select("id", "gen", "value")

    candidates.select("id", "gen", column)
     .join(change, Seq("id", "gen"), "full")
     .withColumn(column, when(col("value").isNotNull, col("value")).otherwise(col(column)))
     .drop("value")
 }).reduce((df1, df2) => df1.join(df2, Seq("id", "gen")) )

}

预期产量:

id gen name  surname supername
1  1   Clark     Kent         SUPERMAN
2  1   Bruce    Wayne     BATMAN
3  2   Clark     Kent         SUPERMAN

我想去掉.collect(),但我不能让它工作。

ugmeyewa

ugmeyewa1#

如果我正确理解了这个示例,那么在您的情况下,一个内部连接和一个groupby组合就足够了。使用groupby,我们将使用collect\ list和map\ from\ arrays生成一个Map,其中将包含每个id/gen对的聚合数据,即 {"name" : "Clarc", "surname" : "Kent", "superaname" : "BATMAN"} :

import org.apache.spark.sql.functions.{collect_list, map_from_arrays, coalesce}

val hdf = (load hero df)
val cdf = (load corrections df)

hdf.join(cdf, Seq("id", "gen"), "left")
   .groupBy(hdf("id"), hdf("gen"))
   .agg(
      map_from_arrays(
         collect_list("attribute"), // the keys
         collect_list("value") // the values
      ).as("m"),
      first("firstname").as("firstname"),
      first("lastname").as("surname"),
      first("supername").as("supername")
    )
   .select(
     $"id", 
     $"gen", 
     coalesce($"m".getItem("name"), $"firstname").as("firstname"),
     coalesce($"m".getItem("surname"), $"surname").as("surname"),
     coalesce($"m".getItem("supername"), $"supername").as("supername")
   )

相关问题