分区上的spark scalaDataframe函数

bihw5rsg  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(414)

我有2亿排1千组像这样

  1. Group X Y Z Q W
  2. group1 0.054464866 0.002248819 0.299069804 0.763352879 0.395905106
  3. group2 0.9986218 0.023649037 0.50762069 0.212225807 0.619571705
  4. group1 0.839928517 0.290339179 0.050407454 0.75837838 0.495466007
  5. group1 0.021003132 0.663366686 0.687928832 0.239132224 0.020848608
  6. group1 0.393843426 0.006299292 0.141103438 0.858481036 0.715860852
  7. group2 0.045960198 0.014858905 0.672267793 0.59750871 0.893646818

我想运行相同的函数(比如 linear regressionX[X, Z, Q, W] )对于每个组。我本可以做到的 Window.partition 但我有自己的功能。目前,我做了以下工作:

  1. df.select("Group").distinct.collect.toList.foreach{group =>
  2. val dfGroup = df.filter(col("Group")===group
  3. dfGroup.withColumn("res", myUdf(col("X"), col("Y"), col("Z"), col("Q"), col("W"))}

不知有没有更好的办法?

oaxa6hgo

oaxa6hgo1#

根据您的喜好,您至少有两个选项:dataframe或dataset。

带udaf的Dataframe

  1. df
  2. .groupBy("group")
  3. .agg(myUdaf(col("col1"), col("col2")))

哪里 myUdaf 是udaf吗
您可以在这里找到如何实现udaf的示例:https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

数据集

你可以用 groupByKey 以及 mapGroups 数据集api的转换:

  1. ds
  2. .groupByKey(_.group)
  3. .mapGroups{case (group, values) =>
  4. (group, aggregator(values))
  5. }

哪里 aggregator scala函数负责聚合对象集合。
如果你不需要聚合,你可以直接Map values 使用 map 转换,示例:

  1. values.map(v => fun(...))
展开查看全部

相关问题