我目前在将python函数转换为python pyspark时遇到了一个问题,因为这两个函数都是不同的库。我想做的是拥有一个查询函数,然后将其应用回同一列。
这就是我为python pandas所做的工作(age是我试图从数据集中检索的列):
Age = [1, 3, -100, -99999, 39, 60, 87, 20, 21, 77777]
def clean_age(Age):
if Age>=0 and Age<=95:
return Age
else:
return np.nan
df['Age'] = df['Age'].apply(clean_age)
对于python pandas,它工作得很好,但现在我对python pyspark所做的就是这样,它不工作:
from pyspark.sql.types import IntegerType, IntegerType
from pyspark.sql.functions import udf
def clean_age(Age):
if Age>=0 and Age<=95:
return Age
else:
return NaN
spark.udf.register("clean_age", clean_age)
udf_myFunction = udf(clean_age, IntegerType())
new_df2 = new_df.withColumn('Age_Clean',udf_myFunction('Age'))
new_df2.show()
请建议我如何实现我从Pandas到Pypark的成果。提前谢谢!
2条答案
按热度按时间gfttwv5a1#
创建自定义项:
来自Dataframe的调用:
ev7lccsx2#
你应该考虑使用
pandas_udf
. 这是给你的Spark >= 2.3.0
(尽管功能复杂,但这可能有点过分):如果你想避免
pandas_udf
,任何版本的Spark >= 2.0.0
可以利用pyspark.sql.functions.when
以及otherwise
.请随意 Package
df.withColumn
然后调用带参数的函数df
又回来了df.withColumn
. 希望这有帮助。