我的基本Dataframe如下所示:heronamesdf
id gen name surname supername
1 1 Clarc Kent BATMAN
2 1 Bruce Smith BATMAN
3 2 Clark Kent SUPERMAN
然后我有另一个和更正:correctionsdf
id gen attribute value
1 1 supername SUPERMAN
1 1 name Clark
2 1 surname Wayne
我解决这个问题的办法是这样做
CorrectionsDF.select(id, gen).distinct().collect().map(r => {
val id = r(0)
val gen = r(1)
val corrections = CorrectionsDF.filter(col("id") === lit(id) and col("gen") === lit(gen))
val candidates = HeroNamesDF.filter(col("id") === lit(id) and col("gen") === lit(gen))
candidates.columns.map(column => {
val change = corrections.where(col("attribute") === lit(column)).select("id", "gen", "value")
candidates.select("id", "gen", column)
.join(change, Seq("id", "gen"), "full")
.withColumn(column, when(col("value").isNotNull, col("value")).otherwise(col(column)))
.drop("value")
}).reduce((df1, df2) => df1.join(df2, Seq("id", "gen")) )
}
预期产量:
id gen name surname supername
1 1 Clark Kent SUPERMAN
2 1 Bruce Wayne BATMAN
3 2 Clark Kent SUPERMAN
我想去掉.collect(),但我不能让它工作。
1条答案
按热度按时间ugmeyewa1#
如果我正确理解了这个示例,那么在您的情况下,一个内部连接和一个groupby组合就足够了。使用groupby,我们将使用collect\ list和map\ from\ arrays生成一个Map,其中将包含每个id/gen对的聚合数据,即
{"name" : "Clarc", "surname" : "Kent", "superaname" : "BATMAN"}
: