将两个不同的 Dataframe 输入到PySpark UDF,并将输出保存到新的 Dataframe 中

5f0d552i  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(186)

我正在尝试使用PySpark数据框来使用PYTHON函数。我需要在输入给出两个 Dataframe ,并希望将结果存储在另一个 Dataframe 中。

我想要使用的Python函数:

@udf(StringType())    
def fuzz_ratio(df1, df2):
    return np.vectorize(fuzz.token_sort_ratio(df1, df2))

这就是我尝试使用上述函数的方式:

result_df.withcolumn("VAL", fuzz_ratio(col(df1.VAL), col(df2.VAL)))

df1df2是输入。这两个数据框的VAL列都包含我需要输入到函数fuzz_ratio的值。输出应保存在result_dfVAL列中。

示例:

Val是所有 Dataframe 中的列名。df1df2列val为字符串类型。

p3rjfoxz

p3rjfoxz1#

当您将这两个列移动到相同的 Dataframe 时,可以使用类似以下pandas_udf的代码。pandas_udf进行了矢量化以提高性能。它与普通的Spark udf不同。

输入:

from pyspark.sql import functions as F
import pandas as pd
from fuzzywuzzy import fuzz

df = spark.createDataFrame([('danial khilji', 'danial'), ('a','as',)], ['col1', 'col2'])
df.show()

# +-------------+------+

# |         col1|  col2|

# +-------------+------+

# |danial khilji|danial|

# |            a|    as|

# +-------------+------+

脚本:

@F.pandas_udf('long')
def fuzz_ratio(c1: pd.Series, c2: pd.Series) -> pd.Series:
    return pd.concat([c1, c2], axis=1).apply(lambda x: fuzz.token_sort_ratio(x[0], x[1]), axis=1)

df.withColumn('fuzz_ratio', fuzz_ratio('col1', 'col2')).show()

# +-------------+------+----------+

# |         col1|  col2|fuzz_ratio|

# +-------------+------+----------+

# |danial khilji|danial|        63|

# |            a|    as|        67|

# +-------------+------+----------+

相关问题