Pyspark transform function with UDF not working

u0njafvf  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(143)

我试图在我的pyspark框架中的结构数组中添加一个新的列'parsed_date'(解析为日期的字符串)。为了做到这一点,我使用dateparser.parse函数,因为我的日期可能具有不可预测的格式。我使用transform()和withField()来处理数组。
我尝试了以下代码:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import udf, transform
  3. from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType
  4. import dateparser
  5. # Sample dataframe
  6. data = [
  7. (1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
  8. (2, [{"date_field": "2021-07-15", "detail": "detail3"}])
  9. ]
  10. schema = StructType([
  11. StructField("id", StringType(), True),
  12. StructField("array_of_structs", ArrayType(
  13. StructType([
  14. StructField("date_field", StringType(), True),
  15. StructField("detail", StringType(), True)
  16. ])
  17. ), True)
  18. ])
  19. df = spark.createDataFrame(data, schema)
  20. # UDF function to parse the dates
  21. @udf
  22. def my_udf(x):
  23. return dateparser.parse(x)
  24. # Applying the UDF for array of structs
  25. result = df.withColumn("array_of_structs", transform(
  26. "array_of_structs",
  27. lambda x: x.withField("parsed_date", my_udf(x["date_field"]))
  28. ))
  29. result.show(truncate=False)

字符串
但我得到以下错误:

  1. org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: my_udf(lambda x_24#970.date_field)#968


我不知道如何使用transform()函数与UDF一起使用。任何帮助都将不胜感激!

cedebl8k

cedebl8k1#

您不能将F.transform与Python UDF一起使用,您必须为您的UDF提供returnType。您可以调整下面的示例以使用dateparser

  1. from pyspark.sql import types as T
  2. from pyspark.sql import functions as F
  3. data = [
  4. (1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
  5. (2, [{"date_field": "2021-07-15", "detail": "detail3"}])
  6. ]
  7. schema = T.StructType([
  8. T.StructField('id', T.StringType(), True),
  9. T.StructField('array_of_structs', T.ArrayType(
  10. T.StructType([
  11. T.StructField('date_field', T.StringType(), True),
  12. T.StructField('detail', T.StringType(), True)
  13. ])
  14. ), True)
  15. ])
  16. df = spark.createDataFrame(data, schema)
  17. return_type = T.ArrayType(
  18. T.StructType([
  19. T.StructField('date_field', T.StringType(), True),
  20. T.StructField('detail', T.StringType(), True),
  21. T.StructField('parsed_date', T.DateType(), True)
  22. ]), True
  23. )
  24. @F.udf(returnType=return_type)
  25. def my_udf(array_struct: list) -> list:
  26. array = []
  27. for struct in array_struct:
  28. try:
  29. parsed_date = datetime.fromisoformat(struct['date_field'])
  30. except ValueError:
  31. parsed_date = None
  32. array.append({
  33. 'date_field': struct['date_field'],
  34. 'detail': struct['detail'],
  35. 'parsed_date': parsed_date
  36. })
  37. print(array)
  38. return array
  39. new_struct = my_udf('array_of_structs').alias('new_array_of_structs')
  40. df.select(new_struct).show(2, False)
  41. # +------------------------------------------------------+
  42. # |new_array_of_structs |
  43. # +------------------------------------------------------+
  44. # |[{january 2023, detail1, null}, {2011, detail2, null}]|
  45. # |[{2021-07-15, detail3, 2021-07-15}] |
  46. # +------------------------------------------------------+

字符串

展开查看全部

相关问题