如何将自定义函数应用于pyspark框架列

kwvwclae  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(182)
  1. @pandas_udf(StringType())
  2. def convert_num(y):
  3. try:
  4. if y.endswith('K')==True:
  5. y = list(y)
  6. y.remove(y[''.join(y).find('K')])
  7. if ''.join(y).startswith('€')==True:
  8. y.remove(y[''.join(y).find('€')])
  9. else:
  10. pass
  11. try :
  12. return str(int(''.join(y))*1000)
  13. except:
  14. return y
  15. elif y.endswith('M')==True:
  16. y = list(y)
  17. y.remove(y[''.join(y).find('M')])
  18. if ''.join(y).startswith('€')==True:
  19. y = list(y)
  20. y.remove(y[''.join(y).find('€')])
  21. else:
  22. pass
  23. try :
  24. return str(float(''.join(y))*1000000)
  25. except:
  26. return y
  27. except:
  28. return y

字符串
我把上面的UDF作为pandas UDF。
在我的spark框架中有一个名为Value的列。我想应用这个函数并转换它。
我用这个

  1. from pyspark.sql.functions import *
  2. df.select(convert_num(df.Value).alias('converted')).take(5)


但是它返回给我的是相同的值,而不是转换它。你可以在下面看到结果。

  1. Row(Player_name='T. Almada', Images='https://cdn.sofifa.net/players/245/371/24_60.png', Age=22, National_team='Argentina', Positions="['CAM', 'CM', 'CF']", Overall=79, Potential_overall=87, Current_club='Atlanta United', Current_contract='2022 ~ 2025', **Value='€39.5M'**, Wage='€10K', Total_stats=2050, **converted_amount='€39.5M'**)


我哪里做错了。

gwbalxhn

gwbalxhn1#

问题是@pandas_udf装饰器意味着convert_num(y)期望ySeries,但您将y视为字符串。
同样从调试的Angular 来看,在except块中返回y的多个try/except块将很难确定一个或多个try块中任何错误代码的来源(如果你得到相同的列值,那么在某个地方抛出了异常,但是从哪个except块?)。
请注意,如果删除外部try/except块,那么运行df.select(convert_num(df.Value)).take(5)将抛出:

  1. AttributeError: 'Series' object has no attribute 'endswith'

字符串
你可以通过重新构造你的convert_num函数来解决这个问题,将输入y视为Series,并在仍然使用相同的字符串逻辑的情况下输出Series:

  1. @pandas_udf(StringType())
  2. def convert_num(s):
  3. def convert_string(y):
  4. if y.endswith('K')==True:
  5. y = list(y)
  6. y.remove(y[''.join(y).find('K')])
  7. if ''.join(y).startswith('€')==True:
  8. y.remove(y[''.join(y).find('€')])
  9. else:
  10. pass
  11. try:
  12. return str(int(''.join(y))*1000)
  13. except:
  14. return y
  15. elif y.endswith('M')==True:
  16. y = list(y)
  17. y.remove(y[''.join(y).find('M')])
  18. if ''.join(y).startswith('€')==True:
  19. y = list(y)
  20. y.remove(y[''.join(y).find('€')])
  21. else:
  22. pass
  23. try:
  24. return str(float(''.join(y))*1000000)
  25. except:
  26. return y
  27. else:
  28. return y
  29. return s.apply(convert_string)


PySpark DataFrame df示例:

  1. +-----------+------+
  2. |Player_name| Value|
  3. +-----------+------+
  4. | PlayerA|€39.5M|
  5. | PlayerB| 390K|
  6. +-----------+------+


转换Value列后输出df

  1. df.select(convert_num(df.Value).alias('converted'))
  2. +----------+
  3. | converted|
  4. +----------+
  5. |39500000.0|
  6. | 390000|
  7. +----------+

展开查看全部
bsxbgnwa

bsxbgnwa2#

正如有人在评论中问到的,如何在不使用pandas的情况下在spark中实现自定义转换,这里有一个简单的例子:

  1. >>> from pyspark.sql.functions import col, udf
  2. >>> df = session.createDataFrame((("Jhon Doe", 1995), ("Elvis Kribs", 1998)))
  3. >>> df = df.toDF("Name", "YOB")
  4. >>> df.printSchema()
  5. root
  6. |-- Name: string (nullable = true)
  7. |-- YOB: long (nullable = true)
  8. >>> @udf
  9. ... def custom_tranformaion(val): return val + " UDF Value!"
  10. ...
  11. >>> df.withColumn("TransformedValue", custom_tranformaion("Name")).show(truncate=False)
  12. +-----------+----+----------------------+
  13. |Name |YOB |TransformedValue |
  14. +-----------+----+----------------------+
  15. |Jhon Doe |1995|Jhon Doe UDF Value! |
  16. |Elvis Kribs|1998|Elvis Kribs UDF Value!|
  17. +-----------+----+----------------------+

字符串
请注意@udf装饰器,它接受一个值并返回转换后的值,该值将成为框架的一部分。
注意:spark优化器通常很难优化udf中的代码。
参考文献:1 2

展开查看全部

相关问题