在spark中的groupby之后跨列收集大多数出现的唯一值

trnvg8h3  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(399)

我有以下Dataframe

val input = Seq(("ZZ","a","a","b","b"),
("ZZ","a","b","c","d"),
("YY","b","e",null,"f"),
("YY","b","b",null,"f"),
("XX","j","i","h",null))
.toDF("main","value1","value2","value3","value4")
input.show()

+----+------+------+------+------+
|main|value1|value2|value3|value4|
+----+------+------+------+------+
|  ZZ|     a|     a|     b|     b|
|  ZZ|     a|     b|     c|     d|
|  YY|     b|     e|  null|     f|
|  YY|     b|     b|  null|     f|
|  XX|     j|     i|     h|  null|
+----+------+------+------+------+

我需要按顺序分组 main 列,并从其余列中为每个列选取两个最常出现的值 main 价值
我做了以下事情

val newdf = input.select('main,array('value1,'value2,'value3,'value4).alias("values"))
val newdf2 = newdf.groupBy('main).agg(collect_set('values).alias("values"))
val newdf3 = newdf2.select('main, flatten($"values").alias("values"))

获取以下形式的数据

+----+--------------------+
|main|              values|
+----+--------------------+
|  ZZ|[a, a, b, b, a, b...|
|  YY|[b, e,, f, b, b,, f]|
|  XX|          [j, i, h,]|
+----+--------------------+

现在我需要从列表中选取出现次数最多的两个项目作为两列。不知道怎么做。
所以,在这种情况下,预期的输出应该是

+----+------+------+
|main|value1|value2|
+----+------+------+
|  ZZ|     a|     b|
|  YY|     b|     f|
|  XX|     j|     i|
+----+------+------+
``` `null` 不应计算,最终值应 `null` 仅当没有其他值可填充时
这是最好的做事方式吗?有更好的方法吗?
jq6vz3qz

jq6vz3qz1#

可以使用自定义项从数组中选择最常出现的两个值。

input.withColumn("values", array("value1", "value2", "value3", "value4"))
  .groupBy("main").agg(flatten(collect_list("values")).as("values"))
  .withColumn("max", maxUdf('values)) //(1)
  .cache() //(2)
  .withColumn("value1", 'max.getItem(0))
  .withColumn("value2", 'max.getItem(1))
  .drop("values", "max")
  .show(false)

maxUdf 定义为

def getMax[T](array: Seq[T]) = {
  array
    .filter(_ != null) //remove null values
    .groupBy(identity).mapValues(_.length) //count occurences of each value
    .toSeq.sortWith(_._2 > _._2) //sort (3)
    .map(_._1).take(2) //return the two (or one) most common values
}
val maxUdf = udf(getMax[String] _)

评论:
在这里使用一个udf意味着整个数组的所有条目都是一个值 main 必须放进一个Spark执行器的记忆里 cache 否则udf将被调用两次,一次用于 value1 就一次 value2 这个 sortWith 这里是稳定的,但是如果两个元素的出现次数相同(比如 i , j 以及 h 对于主值 XX )

6ojccjat

6ojccjat2#

这是我的尝试没有自定义项。

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('main).orderBy('count.desc)

newdf3.withColumn("values", explode('values))
  .groupBy('main, 'values).agg(count('values).as("count"))
  .filter("values is not null")
  .withColumn("target", concat(lit("value"), lit(row_number().over(w))))
  .filter("target < 'value3'")
  .groupBy('main).pivot('target).agg(first('values)).show

+----+------+------+
|main|value1|value2|
+----+------+------+
|  ZZ|     a|     b|
|  YY|     b|     f|
|  XX|     j|  null|
+----+------+------+

最后一行的值为空,因为我用这种方式修改了您的Dataframe,

+----+--------------------+
|main|              values|
+----+--------------------+
|  ZZ|[a, a, b, b, a, b...|
|  YY|[b, e,, f, b, b,, f]|
|  XX|              [j,,,]| <- For null test
+----+--------------------+

相关问题