使用筛选器在循环内部Dataframe中创建列Pyspark

ecbunoof  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(141)

我想为列表“weeks”中的每个元素创建列,并将它们全部放在一个 Dataframe 中。 Dataframe “df”根据“weeknum”进行过滤,然后创建列。在运行时,结束 Dataframe 只包含最后一个“weeknum”的信息。我如何为所有“weeknum”左连接创建列?
我试过这个:

weeks = [24, 25]
for weeknum in weeks:
    df_new = df.filter(df.week == weeknum).groupBy(['gender', 'pro']).pivot("share").agg(first('forecast_units')) \
        .withColumnRenamed('0.01', 'units_1_share_wk'+str(weeknum))\
        .withColumnRenamed('0.1', 'units_10_share_wk'+str(weeknum))\
        .withColumnRenamed('0.15', 'units_15_share_wk'+str(weeknum))\
        .withColumnRenamed('0.2', 'units_20_share_wk'+str(weeknum)) 
df_new.show()

但这只返回“weeks”中最后一个“weeknum”的 Dataframe 。
原始 Dataframe “df”如下所示:

|country|gender|order_date|         pro|share|        prediction|week|dayofweek|forecast_units|
+-------+------+----------+------------+-------------+------------------+----+---------+-------------------+
| ES|  Male|2022-09-15|Jeans - Flat|         0.01|13.322306632995605|  37|        5|               93.0|
| ES|  Male|2022-09-15|Jeans - Flat|          0.1| 19.09369468688965|  37|        5|              134.0|
| ES|  Male|2022-09-15|Jeans - Flat|         0.15|22.504554748535156|  37|        5|              158.0|

我希望结束 Dataframe 具有以下结构:

|gender|pro|units_1_tpr_wk24|units_10_tpr_wk24|units_15_tpr_wk24|units_20_tpr_wk24|units_1_tpr_wk25|units_10_tpr_wk25|units_15_tpr_wk25|units_20_tpr_wk25|

预期输出:

|gender|pro|units_1_tpr_wk24|units_10_tpr_wk24|units_15_tpr_wk24|units_20_tpr_wk24|units_1_tpr_wk25|units_10_tpr_wk25|units_15_tpr_wk25|units_20_tpr_wk25|
|---+---+---+---+---+---+---+---+---+---+|
|Female|Belts|28.0|0.0|0.0|0.0|28.0|0.0|0.0|0.0|
|Female|Dress|0.0|44.0|0.0|0.0|0.0|0.0|0.0|0.0|
|Male|Belts|0.0|0.0|33.0|0.0|28.0|0.0|0.0|0.0|
|Male|Suits|0.0|0.0|0.0|34.0|0.0|0.0|0.0|0.0|
carvr3hs

carvr3hs1#

我建议先生成所有需要的列,然后将其传递给select函数,如下所示:

from pyspark.sql.functions import col

weeks = [24, 25]
cols_to_select = []
for weeknum in weeks:
    cols_to_select.extend([
        col('0.01').alias(f'units_1_share_wk{weeknum}'),
        col('0.1').alias(f'units_10_share_wk{weeknum}'),
        col('0.15').alias(f'units_15_share_wk{weeknum}'),
        col('0.2').alias(f'units_20_share_wk{weeknum}')
    ])

df.filter(df.week == weeknum).groupBy(['gender', 'pro']).pivot("share").agg(first('forecast_units')).select([col("gender"), col("pro")] + cols_to_select)

相关问题