如何将带有多个参数的自定义函数应用于Dataframe的每组,并在scala spark中合并生成的Dataframe?

fdbelqdn  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(428)

我有一个定制的函数,它看起来像这样,返回一个不同的Dataframe作为输出

def customizedfun(data : DataFrame, param1 : Boolean, param2 : string) : DataFrame = {...}

我想把这个函数应用到每一组

df.groupBy("type")

然后从每个 type 一个Dataframe。
这与有关将自定义函数应用于分组Dataframe的其他问题稍有不同,因为除了所讨论的Dataframe之外,此函数还接受其他输入 df.groupBy("type") .
最好的办法是什么?

piztneat

piztneat1#

你可以过滤掉原稿 df 给不同的小组打电话 customizedfun 然后将结果合并。
我想 customizedfun 是一个简单地将两个参数添加为新列的函数,但它可以是任何函数:

def customizedfun(data : DataFrame, param1 : Boolean, param2 : String) : DataFrame =
  data.withColumn("newCol", lit(s"$param2 $param1"))

我需要两个助手函数来计算 param1 以及 param2 取决于 type . 在实际应用程序中,这些函数可以是字典的查找。

def calcParam1(typ: Integer): Boolean = typ % 2 == 0
def calcParam2(typ: Integer): String = s"type is $typ"

现在是原来的 df 被分成不同的组, customizedfun 调用并合并结果:

//create some test data
val df = Seq((1, "A", "a"), (1, "B", "b"), (1, "C", "c"), (2, "D", "d"), (2, "E", "e"), (3, "F", "f"))
  .toDF("type", "val1", "val2")
//+----+----+----+
//|type|val1|val2|
//+----+----+----+
//|   1|   A|   a|
//|   1|   B|   b|
//|   1|   C|   c|
//|   2|   D|   d|
//|   2|   E|   e|
//|   3|   F|   f|
//+----+----+----+

//get the distinct values of column type
val distinctTypes = df.select("type").distinct().as[Integer].collect()

//call customizedfun for each group
val resultPerGroup= for( typ <- distinctTypes)
  yield customizedfun( df.filter(s"type = $typ"), calcParam1(typ), calcParam2(typ))

//the final union
val result = resultPerGroup.tail.foldLeft(resultPerGroup.head)(_ union _)

//+----+----+----+---------------+
//|type|val1|val2|         newCol|
//+----+----+----+---------------+
//|   1|   A|   a|type is 1 false|
//|   1|   B|   b|type is 1 false|
//|   1|   C|   c|type is 1 false|
//|   3|   F|   f|type is 3 false|
//|   2|   D|   d| type is 2 true|
//|   2|   E|   e| type is 2 true|
//+----+----+----+---------------+

相关问题