PySpark中double数据类型的UDF函数

pkbketx9  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(259)

我正在尝试使用PySpark中的UDF函数创建列。
我尝试的代码如下所示:


# The function checks year and adds a multiplied value_column to the final column

def new_column(row, year):
    if year == "2020":
        return row * 0.856 
    elif year == "2019": 
        return row * 0.8566
    else:
        return row

final_udf = F.udf(lambda z: new_column(z), Double()) #How do I get - Double datatype here 
res = res.withColumn("final_value", final_udf(F.col('value_column'), F.col('year')))

如何在final_udf中写入Double()?我知道对于string,我们可以使用StringType()。但是如何在“final_value”列中返回double类型?

jdgnovmf

jdgnovmf1#

输入:

from pyspark.sql import functions as F, types as T
res = spark.createDataFrame([(1.0, '2020',), (1.0, '2019',), (1.0, '2018',)], ['value_column', 'year'])
  • udf在处理大数据时效率非常低。*

您应该首先尝试在原生Spark中执行此操作:

res = res.withColumn(
    'final_value',
    F.when(F.col('year') == "2020", F.col('value_column') * 0.856)
     .when(F.col('year') == "2019", F.col('value_column') * 0.8566)
     .otherwise(F.col('value_column'))
)
res.show()

# +------------+----+-----------+

# |value_column|year|final_value|

# +------------+----+-----------+

# |         1.0|2020|      0.856|

# |         1.0|2019|     0.8566|

# |         1.0|2018|        1.0|

# +------------+----+-----------+

如果在原生Spark中无法完成,请转到**pandas_udf**:

from pyspark.sql import functions as F, types as T
import pandas as pd

@F.pandas_udf(T.DoubleType())
def new_column(row: pd.Series, year: pd.Series) -> pd.Series:
    if year == "2020":
        return row * 0.856 
    elif year == "2019": 
        return row * 0.8566
    else:
        return row

res = res.withColumn("final_value", final_udf('value_column', 'year'))

res.show()

# +------------+----+-----------+

# |value_column|year|final_value|

# +------------+----+-----------+

# |         1.0|2020|      0.856|

# |         1.0|2019|     0.8566|

# |         1.0|2018|        1.0|

# +------------+----+-----------+

只有在万不得已的情况下,你才应该选择udf

@F.udf(T.DoubleType())
def new_column(row, year):
    if year == "2020":
        return row * 0.856 
    elif year == "2019": 
        return row * 0.8566
    else:
        return row

res = res.withColumn("final_value", new_column('value_column', 'year'))

res.show()

# +------------+----+-----------+

# |value_column|year|final_value|

# +------------+----+-----------+

# |         1.0|2020|      0.856|

# |         1.0|2019|     0.8566|

# |         1.0|2018|        1.0|

# +------------+----+-----------+
7rtdyuoh

7rtdyuoh2#

使用简单的字符串"double"或导入pypspark的DoubleType


# like this

final_udf = F.udf(lambda z: new_column(z), "double")

# or this

import pyspark.sql.types as T
final_udf = F.udf(lambda z: new_column(z), T.DoubleType())

相关问题