基于类方法创建pyspark dataframe列-带参数

ff29svar  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(541)

我有一个python类,它有如下函数:

class Features():
    def __init__(self, json):
        self.json = json

    def email_name_match(self,name):
        #some code
        return result

我目前拥有的PyparkDataframe如下所示:

+---------------+-----------
 |raw_json         |firstName
 +----------------+----------
 |                 |  
 +----------------+--------
 |                 |  
 +----------------+-------

我正在尝试使用pyspark数据框中的email\u name\u match函数来创建一个新列。因此,应该传递“raw\u json”列来初始化特性对象,“firstname”应该作为“email\u name\u match”方法的“name”参数传递。
我做了以下工作:

email_name_match_udf = F.udf(lambda j: NationalRetailFeatures(json.loads(j)).email_name_match())
df = avtk_gold.withColumn('firstname_email_match', F.udf(lambda j: NationalRetailFeatures(json.loads(j)).email_name_match(col("firstName")))("raw_json"))

但它不起作用。显示错误:

AttributeError: 'NoneType' object has no attribute '_jvm

我该怎么办?理想的Dataframe如下所示:

+---------------+-------------+-----------
 |raw_json       |firstName   |   name_email_match
 +----------------+------------------------
 |                 |          |
 +----------------+-------------------------
 |                 |          |
 +----------------+------------------------
niwlg2el

niwlg2el1#

你的问题遗漏了很多信息,但我想这可能有用:

import pyspark.sql.functions as F

class Features():
    def __init__(self, json):
        self.json = json

    def email_name_match(self,name):
        #some code
        return result

def my_udf(j, k):
    return Features(json.loads(j)).email_name_match(k)

spark_udf = F.udf(my_udf)

df = avtk_gold.withColumn('firstname_email_match', 
                          spark_udf(F.col("firstName"), F.col("raw_json"))
                         )

你不能在你的自定义项定义里放一个Spark列。只能将spark列传递到udf中。所以我有两个函数参数 j 以及 k .

相关问题