scala spark将共享逻辑提取到函数中并用于聚合

2ledvvac  于 2022-11-29  发布在  Scala
关注(0)|答案(1)|浏览(152)

下面是一个计算聚合的函数:

def compute(spark: SparkSession,
                  mydf: DataFrame): DataFrame = {
    
  mydf
    .groupBy(col("col1"), col("col2")
    .agg(
      count(when(col("col5") === "some_string_to_check", col("purchase_date"))).as("name1"),
      count(when(col("col5") === "some_string_to_check", col("purchase_date"))).as("name2"),
      count(when(col("col5") === "some_string_to_check"), col("purchase_date"))).as("name3"),
      count(when(col("col5") === "some_string_to_check"), col("purchase_date"))).as("name4"),

      count(when(col("col5") === "some_string1", col("purchase_date"))).as("name10"),
      count(when(col("col5") === "some_string1", col("purchase_date"))).as("name11"),
      count(when(col("col5") === "some_string1"), col("purchase_date"))).as("name12"),
      count(when(col("col5") === "some_string1"), col("purchase_date"))).as("name13")
    )
}

如您所见,此函数具有重复模式(我显示了2个,但实际上有10多个),因为这是重复代码,所以我希望将公共逻辑提取到函数中。类似于(伪代码):

def compute(spark: SparkSession,
                  mydf: DataFrame): DataFrame = {
    
  mydf
    .groupBy(col("col1"), col("col2")
    .agg(
      func("col5", "some_string_to_check", "purchase_date", ["name1", "name2", "name3", "name4"]),
      func("col5", "some_string1", "purchase_date", ["name10", "name11", "name12", "name13"])
    )
}

def func(col, string_to_compate, date_col, array_of_name_results) = {
  count(col === string_to_compate, col(date_col))).as(array_of_name_results[0]),
  count(when(col  === string_to_compate, col(date_col))).as(array_of_name_results[1]),
  count(when(col  === string_to_compate), col(date_col))).as(array_of_name_results[2]),
  count(when(col === string_to_compate), col(date_col))).as(array_of_name_results[3]),
}

这可以做到吗?我找不到任何参考,如果它是可能的使用函数来创建.agg()的逻辑?任何帮助在这方面是感激。

6kkfgxo0

6kkfgxo01#

如果考虑agg的函数签名,您将看到:

def agg(expr: Column, exprs: Column*): DataFrame

在此基础上,我们可以创建一个聚合Column表达式的列表,并将其传入:

def calculate(spark: SparkSession,
              mydf: DataFrame): DataFrame = {

  def func(col: Column, string_to_compate: String, date_col: String, array_of_name_results: Array[String]): List[Column] = ???
  val aggregates = func(col("x"), "y", "z", Array.empty)
  mydf
    .groupBy(col("col1"), col("col2"))
      .agg(aggregates.head, aggregates.tail:_*)
}

它看起来不像纯方法调用那么优雅,但确实允许重用func中捆绑的逻辑。

相关问题