实现从python到pyspark-Pyspark的lambda函数

w8rqjzmb  于 2022-11-28  发布在  Spark
关注(0)|答案(3)|浏览(144)
    • 巨蟒:**

我有一个 Dataframe ,我正在应用一个lambda函数来检查基于列值的条件。

In Pandas it looks like this(Example):

new_df = df1.merge(df2, how='left', left_on='lkey', right_on='rkey')

  lkey value_x rkey value_y col1 col2 col3 col4 col5 
0  foo     one  foo    five  0    1     3    0   5    
1  foo     one  foo     NaN  1    0     2    4   0    
2  bar     two  bar     six  2    6     3    0   0    
3  foo    five  foo    five  7    2     0    0   0    
4  foo    five  foo     NaN  2    0     0    0   0   
5  bbb    four  bar    two   0    0     0    0   0      

def get_final_au(row):
    if row['col5'] == 0: 
        if row['col4'] == 0: 
            if row['col3'] == 0: 
                if row['col2'] == 0: 
                    return 'NOT FOUND'
                else:
                    return row['col2']
            else:
                return row['col3']
        else:
            return row['col4']
    else:
         return row['col5']

new_df['col6'] = new_df.apply (lambda row: get_final_au(row),axis=1)

Expected Output:

  lkey value_x rkey value_y col1 col2 col3 col4 col5 col6
0  foo     one  foo    five  0    1     3    0   5    5
1  foo     one  foo     NaN  1    0     2    4   0    4
2  bar     two  bar     six  2    6     3    0   0    3
3  foo    five  foo    five  7    2     0    0   0    2
4  foo    five  foo     NaN  2    0     0    0   0   Not FOUND
5  bbb    four  bar    two   0    0     0    0   0   Not FOUND
    • 游乐园:**

在Pyspark中,我将如何做类似的事情?


我已经尝试过这个,但得到错误。请建议

from pyspark.sql.functions import udf
def get_final_au(row):
    if row['col5'] != 0:
        return row['col5']
    elif row['col4'] != 0:
        return row['col4']
    elif row['col3'] != 0:
        return row['col3']
    elif row['col2'] != 0:
        return row['col2']
    else:
        return 'NOT FOUND'
UDF_NAME = udf(lambda row: get_final_au(row), StringType())
new_df.withColumn('col6', UDF_NAME('col5','col4','col3','col2')).show(2,False)
5cnsuln7

5cnsuln71#

我认为你可以使用UDF函数ORwhen子句。when子句会更容易。

    • UDF**的语法如下
from pyspark.sql.functions import udf

def function_name(arg):
    # Logic
    # Return value

# Register the UDF
UDF_NAME = udf(function_name, ArgType())

df.select(UDF_NAME('col').alias('new_col'))

forwhen子句

df.withColumn("new_column", when(condition1, value).when(condition2, value).otherwise(value))
bzzcjhmw

bzzcjhmw2#

可能重复:Apply a function to a single column of a csv in Spark

建议:

get_final_au修改为

def get_final_au(row):
    if row['col2'] != 0:
        return row['col2']
    elif row['col3'] != 0:
        return row['col3']
    elif row['col4'] != 0:
        return row['col4']
    elif row['col5'] != 0:
        return row['col5']
    else:
        return 'NOT FOUND'
8iwquhpp

8iwquhpp3#

udfs是最后的手段。它们相当慢。使用以下代码可以得到相同的结果

cols=[ 'col1', 'col2', 'col3', 'col4','col5']
new_df.select(*[when(col(x)==0,'NOT FOUND').otherwise(col(x)).alias(x) for x in cols]).show()

相关问题