我需要创建基于三个dataframe字段的新字段。这是有效的,但似乎效率低下:
def my_func(very_long_field_name_a, very_long_field_name_b, very_long_field_name_c):
if very_long_field_name_a >= very_long_field_name_b and very_long_field_name_c <= very_long_field_name_b:
return 'Y'
elif very_long_field_name_a <= very_long_field_name_b and very_long_field_name_c >= very_long_field_name_b:
return 'Y'
else:
return 'N'
import pyspark.sql.functions as F
my_udf = F.udf(my_func)
df.withColumn('new_field', my_udf(df.very_long_field_name_a, df.very_long_field_name_b, df.very_long_field_name_c)).display()
有没有可能像这样传递 Dataframe ?我试了一下,但出现了一个错误:
def my_func(df):
if df.very_long_field_name_a >= df.very_long_field_name_b and df.very_long_field_name_c <= df.very_long_field_name_b:
return 'Y'
df.elif very_long_field_name_a <= df.very_long_field_name_b and df.very_long_field_name_c >= df.very_long_field_name_b:
return 'Y'
else:
return 'N'
import pyspark.sql.functions as F
my_udf = F.udf(my_func)
df.withColumn('new_field', my_udf(df)).display()
Invalid argument, not a string or column:
我想缩短它的原因是因为我已经创建了六个新字段。复制和粘贴所有作为参数传递的字段名似乎效率很低,所以我想知道是否有更干净的方法。
1条答案
按热度按时间a0x5cqrl1#
要基于DataFrame中的多个列创建新字段,而不显式地将每个列作为参数传递给UDF,可以使用PySpark中的
struct
函数。struct
函数将多个列合并为一个StructType列。下面是一个例子:在这种方法中,我们使用
struct
函数将必要的列(very_long_field_name_a
,very_long_field_name_b
,very_long_field_name_c
)组合成一个名为combined_fields
的列。然后,我们使用my_udf(F.col('combined_fields'))
将UDF应用于combined_fields
列。最后,我们使用df.drop('combined_fields')
删除临时组合列。通过使用
struct
,可以避免将每一列作为参数显式传递给UDF,从而使代码更简洁、更高效。