spark:需要确认捕获数据集上第一个和最后一个日期的方法

lnlaulya  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(353)

我有一个数据框:

A, B, C, D, 201701, 2020001
A, B, C, D, 201801, 2020002
A, B, C, D, 201901, 2020003

预期产量:

col_A, col_B, col_C ,col_D, min_week ,max_week, min_month, max_month
A,         B,     C,     D,    201701,  201901,  2020001,  2020003

我在Pypark的尝试-

from pyspark.sql import Window
import pyspark.sql.functions as psf

w1 = Window.partitionBy('A','B', 'C', 'D')\
.orderBy('WEEK','MONTH')
df_new = df_source\
.withColumn("min_week", psf.first("WEEK").over(w1))\
.withColumn("max_week", psf.last("WEEK").over(w1))\
.withColumn("min_month", psf.first("MONTH").over(w1))\
.withColumn("max_month", psf.last("MONTH").over(w1))

我也试过了-

sql_1 = """
select A, B , C, D, first(WEEK) as min_week, 
last(WEEK) as max_week , first(MONTH) as min_month, 
last(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)

使用第一和第二种方法,我得到了不一致的结果。下面的方法能解决上面遇到的问题吗-

sql_1 = """
select A, B , C, D, min(WEEK) as min_week, 
max(WEEK) as max_week , min(MONTH) as min_month, 
max(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)

哪种方法每次在pyspark中都非常有效?有别的办法吗
或者,第三种选择是否是处理此需求的最佳方法。
任何提示都会有帮助。

uqcuzwp8

uqcuzwp81#

你提出的第三种方法每次都会奏效。你也可以这样写:

df
    .groupBy('A', 'B', 'C', 'D')
    .agg(F.min('WEEK').alias('min_week'), F.max('WEEK').alias('max_week'),
         F.min('MONTH').alias('min_month'), F.max('MONTH').alias('max_month'))
    .show()

由此产生:

+---+---+---+---+--------+--------+---------+---------+
|  A|  B|  C|  D|min_week|max_week|min_month|max_month|
+---+---+---+---+--------+--------+---------+---------+
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
+---+---+---+---+--------+--------+---------+---------+

理解为什么前两种方法产生不可预测的结果而第三种方法总是有效是很有趣的。
第二种方法是不可预测的,因为spark是一个并行计算引擎。当它聚合一个值时,它首先聚合所有分区中的值,然后结果将按二乘二进行聚合。然而,这些聚合的顺序并不确定。它取决于任务的完成顺序,每次尝试都会发生变化,特别是在数据量很大的情况下。
第一种方法并不完全是你想做的。窗口函数不会将Dataframe聚合到一行中。他们将计算聚合并将其添加到每一行。你也犯了几个错误。如果对Dataframe进行排序,默认情况下spark会考虑从窗口开始到当前行的窗口。因此,最大值将是本周的当前行。实际上,要计算in和max,不需要对Dataframe进行排序。你可以这样做:

w = Window.partitionBy('A','B', 'C', 'D')
df.select('A', 'B', 'C', 'D',
    F.min('WEEK').over(w).alias('min_week'),
    F.max('WEEK').over(w).alias('max_week'),
    F.min('MONTH').over(w).alias('min_month'),
    F.max('MONTH').over(w).alias('max_month')
).show()

结果是正确的,但这不是你所期望的。但至少,您看到了窗口聚合和常规聚合之间的区别。

+---+---+---+---+--------+--------+---------+---------+
|  A|  B|  C|  D|min_week|max_week|min_month|max_month|
+---+---+---+---+--------+--------+---------+---------+
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
+---+---+---+---+--------+--------+---------+---------+

相关问题