分区上的spark scalaDataframe函数

bihw5rsg  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(366)

我有2亿排1千组像这样

Group     X             Y             Z          Q           W
group1  0.054464866 0.002248819 0.299069804 0.763352879 0.395905106
group2  0.9986218   0.023649037 0.50762069  0.212225807 0.619571705
group1  0.839928517 0.290339179 0.050407454 0.75837838  0.495466007
group1  0.021003132 0.663366686 0.687928832 0.239132224 0.020848608
group1  0.393843426 0.006299292 0.141103438 0.858481036 0.715860852
group2  0.045960198 0.014858905 0.672267793 0.59750871  0.893646818

我想运行相同的函数(比如 linear regressionX[X, Z, Q, W] )对于每个组。我本可以做到的 Window.partition 但我有自己的功能。目前,我做了以下工作:

df.select("Group").distinct.collect.toList.foreach{group => 
val dfGroup = df.filter(col("Group")===group
dfGroup.withColumn("res", myUdf(col("X"), col("Y"), col("Z"), col("Q"), col("W"))}

不知有没有更好的办法?

oaxa6hgo

oaxa6hgo1#

根据您的喜好,您至少有两个选项:dataframe或dataset。

带udaf的Dataframe

df
  .groupBy("group")
  .agg(myUdaf(col("col1"), col("col2")))

哪里 myUdaf 是udaf吗
您可以在这里找到如何实现udaf的示例:https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

数据集

你可以用 groupByKey 以及 mapGroups 数据集api的转换:

ds
  .groupByKey(_.group)
  .mapGroups{case (group, values) =>
    (group, aggregator(values))
  }

哪里 aggregator scala函数负责聚合对象集合。
如果你不需要聚合,你可以直接Map values 使用 map 转换,示例:

values.map(v => fun(...))

相关问题