下面是一个计算聚合的函数:
def compute(spark: SparkSession,
mydf: DataFrame): DataFrame = {
mydf
.groupBy(col("col1"), col("col2")
.agg(
count(when(col("col5") === "some_string_to_check", col("purchase_date"))).as("name1"),
count(when(col("col5") === "some_string_to_check", col("purchase_date"))).as("name2"),
count(when(col("col5") === "some_string_to_check"), col("purchase_date"))).as("name3"),
count(when(col("col5") === "some_string_to_check"), col("purchase_date"))).as("name4"),
count(when(col("col5") === "some_string1", col("purchase_date"))).as("name10"),
count(when(col("col5") === "some_string1", col("purchase_date"))).as("name11"),
count(when(col("col5") === "some_string1"), col("purchase_date"))).as("name12"),
count(when(col("col5") === "some_string1"), col("purchase_date"))).as("name13")
)
}
如您所见,此函数具有重复模式(我显示了2个,但实际上有10多个),因为这是重复代码,所以我希望将公共逻辑提取到函数中。类似于(伪代码):
def compute(spark: SparkSession,
mydf: DataFrame): DataFrame = {
mydf
.groupBy(col("col1"), col("col2")
.agg(
func("col5", "some_string_to_check", "purchase_date", ["name1", "name2", "name3", "name4"]),
func("col5", "some_string1", "purchase_date", ["name10", "name11", "name12", "name13"])
)
}
def func(col, string_to_compate, date_col, array_of_name_results) = {
count(col === string_to_compate, col(date_col))).as(array_of_name_results[0]),
count(when(col === string_to_compate, col(date_col))).as(array_of_name_results[1]),
count(when(col === string_to_compate), col(date_col))).as(array_of_name_results[2]),
count(when(col === string_to_compate), col(date_col))).as(array_of_name_results[3]),
}
这可以做到吗?我找不到任何参考,如果它是可能的使用函数来创建.agg()
的逻辑?任何帮助在这方面是感激。
1条答案
按热度按时间6kkfgxo01#
如果考虑
agg
的函数签名,您将看到:在此基础上,我们可以创建一个聚合
Column
表达式的列表,并将其传入:它看起来不像纯方法调用那么优雅,但确实允许重用
func
中捆绑的逻辑。