pyspark 具有UDF的行操作

zbdgwd5y  于 2023-04-05  发布在  Spark
关注(0)|答案(1)|浏览(168)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField, IntegerType

# Define the schema for the input DataFrame
input_schema = StructType([
    StructField("col1", IntegerType(), True),
    StructField("col2", IntegerType(), True)
])

# Define the UDF that accepts an entire row as input and performs operations using columns
@udf(returnType=StringType())
def my_udf(row):
    col1 = row.col1
    col2 = row.col2
    result = col1 + col2
    return str(result)

# Create a sample DataFrame
data = [(1, 2), (3, 4), (5, 6)]
df = spark.createDataFrame(data, schema=input_schema)

# Apply the UDF to the DataFrame
result_df = df.withColumn("result", my_udf(df))

我正在尝试运行上面的代码,但我看到下面的错误:TypeError: Invalid argument, not a string or column: DataFrame[col1: int, col2: int] of type . For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
Spark版本:数据砖中的3.3.1
谁能告诉我,我做错了什么?
我已经尝试了多种排列和组合,但我不能让它工作。

2wnc66cl

2wnc66cl1#

您可以传递所有列的struct

from pyspark.sql.functions import struct

result_df = df.withColumn("result", my_udf(struct([df[col] for col in df.columns])))

+----+----+------+
|col1|col2|result|
+----+----+------+
|   1|   2|     3|
|   3|   4|     7|
|   5|   6|    11|
+----+----+------+

相关问题