聚合一列,但在select中显示所有列

ryevplcw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(412)

我试图显示列中的最大值,同时按日期列对行进行分组。
所以我试过这个密码

maxVal = dfSelect.select('*')\
            .groupBy('DATE')\
            .agg(max('CLOSE'))

但是输出看起来是这样的:

+----------+----------+
|      DATE|max(CLOSE)|
+----------+----------+
|1987-05-08|     43.51|
|1987-05-29|    39.061|
+----------+----------+

我想有如下输出

+------+---+----------+------+------+------+------+------+---+----------+
|TICKER|PER|      DATE|  TIME|  OPEN|  HIGH|   LOW| CLOSE|VOL|max(CLOSE)|
+------+---+----------+------+------+------+------+------+---+----------+
|   CDG|  D|1987-01-02|000000|50.666|51.441|49.896|50.666|  0|    50.666|
|   ABC|  D|1987-01-05|000000|51.441| 52.02|51.441|51.441|  0|    51.441|
+------+---+----------+------+------+------+------+------+---+----------+

因此,我的问题是如何更改代码,使其具有所有列的输出和聚合的“close”列?
我的数据方案如下:

root
 |-- TICKER: string (nullable = true)
 |-- PER: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- TIME: string (nullable = true)
 |-- OPEN: float (nullable = true)
 |-- HIGH: float (nullable = true)
 |-- LOW: float (nullable = true)
 |-- CLOSE: float (nullable = true)
 |-- VOL: integer (nullable = true)
 |-- OPENINT: string (nullable = true)
kqqjbcuj

kqqjbcuj1#

如果您希望对原始Dataframe中的所有列进行相同的聚合,那么可以执行以下操作:,

import pyspark.sql.functions as F
expr = [F.max(coln).alias(coln) for coln in df.columns if 'date' not in coln] # df is your datafram
df_res = df.groupby('date').agg(*expr)

如果你想要多个聚合,那么你可以这样做,

sub_col1 = # define
sub_col2=# define
expr1 = [F.max(coln).alias(coln) for coln in sub_col1 if 'date' not in coln]
expr2 = [F.first(coln).alias(coln) for coln in sub_col2 if 'date' not in coln]
expr=expr1+expr2
df_res = df.groupby('date').agg(*expr)

如果只希望聚合其中一列并将其添加到原始Dataframe中,那么可以在聚合之后执行selfjoin

df_agg = df.groupby('date').agg(F.max('close').alias('close_agg')).withColumn("dummy",F.lit("dummmy")) # dummy column is needed as a workaround in spark issues of self join
df_join = df.join(df_agg,on='date',how='left')

也可以使用窗口功能

from pyspark.sql import Window
w= Window.partitionBy('date')
df_res = df.withColumn("max_close",F.max('close').over(w))

相关问题