pyspark-对列列表应用一个udf并返回几个Dataframe

6psbrbz9  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(415)

我创造了一个 udf ,其目的是 ffill 以及 bfill 一列,并返回一个新的插补 dataframe . 错误不在函数中,因为它工作正常。
请看下面我的功能:

def ffill_bfill(df,partition_by_col,order_by_col,col_to_imp):

    '''Forward fill and Backward fill a column by a column/set of columns (order_col).  
    Parameters:
    ------------
    df: Dataframe that the columns are in (Company wide? Company Narrow?) 
    order_col: String or List of string. This is the Year column until we get more granular time data!!
    fill_col: String (Only work for a column). The name of the column to be imputed!!

    Return:
    ---------
    df: Dataframe 
        Return df with the filled_cols. 
    '''

    # create the series containing the forward filled values             
    window_ff = Window.partitionBy(partition_by_col).orderBy(order_by_col).rowsBetween(-sys.maxsize, 0)

    # create the series containing the backward filled values  
    window_bf = Window.partitionBy(partition_by_col).orderBy(order_by_col).rowsBetween(0, sys.maxsize)

    # create the series containing the BACKWARD filled values for the two columns 
    s_bf = func.first(df[col_to_imp], ignorenulls=True).over(window_bf)

    # create the series containing the FORWARD filled values for the two columns
    s_ff = func.last(df[col_to_imp], ignorenulls=True).over(window_ff)

    # add the IMPUTED column to a dataframe 
    imputed_df = df_company_wide.withColumn(f'{col_to_imp}_bf', s_bf)\
                                .withColumn(f'{col_to_imp}_ff', s_ff)

    # Fill in the nulls with the imputed values
    imputed_df = imputed_df.withColumn(f'{col_to_imp}_imp',coalesce(col_to_imp,f'{col_to_imp}_ff',f'{col_to_imp}_bf'))

    # Create the imputed dataframes
    cols_to_use = ['isin','company','year',col_to_imp]+[s for s in imputed_df.columns if col_to_imp in s and 'imp' in s]
    imputed_df_final = imputed_df.select(cols_to_use)

    return imputed_df_final

问题在于我应用函数的方式:
我的意图是在4列中应用函数,并返回4个插补的Dataframe。我试着用下面的代码来实现这一点:


# Get the columns to be imputed in a list

features_to_impute = ['mobile_maximum_plan_for_one',
                     'mobile_minimum_plan_for_one',
                     'slowest_internet_speed',
                     'fastest_internet_speed']

# Return a dataframe and make available for SQL

for feature in features_to_impute:
  f"{feature}_imp"= ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp=f"'{feature}'")  
  f"{feature}_imputed".createOrReplaceTempView(f"{feature}_imputed")

当我运行上面的命令时,我得到一个错误:

SyntaxError: can't assign to literal
  File "<command-575233896480136>", line 21
    f"{feature}_imp"= ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp=f"'{feature}'")
    ^
SyntaxError: can't assign to literal

但当我尝试一次在一列上应用该函数时(如下所示),它是有效的:

mobile_maximum_plan_for_one_imputed = ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp='mobile_maximum_plan_for_one')
mobile_minimum_plan_for_one_imputed.show()

+------------+----------------+------+---------------------------+-------------------------------+
|        isin|         company|  year|mobile_minimum_plan_for_one|mobile_minimum_plan_for_one_imp|
+------------+----------------+------+---------------------------+-------------------------------+
|BE0003810273|        Proximus|2015.0|                       null|              11.19820828667413|
|BE0003810273|        Proximus|2016.0|                       null|              11.19820828667413|
|BE0003810273|        Proximus|2017.0|                       null|              11.19820828667413|
|BE0003810273|        Proximus|2018.0|                       null|              11.19820828667413|
|BE0003810273|        Proximus|2019.0|          11.19820828667413|              11.19820828667413|
|CH0008742519|        Swisscom|2015.0|                       null|                          29.82|
|CH0008742519|        Swisscom|2016.0|                       null|                          29.82|
|CH0008742519|        Swisscom|2017.0|                       null|                          29.82|
|CH0008742519|        Swisscom|2018.0|                      29.82|                          29.82|
|CH0008742519|        Swisscom|2019.0|                      29.82|                          29.82|

有人能解释一下如何修复for循环,从而成功地将4个不同的Dataframe和插补值一起带回吗?一个好的解释会增加很多价值!
非常感谢。

6l7fqoea

6l7fqoea1#

您的代码无法运行,因为您正在将Dataframe分配给字符串而不是变量。在任何情况下,使用变量名都不是一个好的做法。为此你可以考虑使用词典。

features = dict()
for feature in features_to_impute:
  features[f"{feature}_imp"] = ffill_bfill(df_company_wide,partition_by_col='isin',order_by_col='year',col_to_imp=f"'{feature}'")  
  features[f"{feature}_imputed"].createOrReplaceTempView(f"{feature}_imputed")

相关问题