pyspark自定义排序,部分值已知,比udf更有效

vaj7vani  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(189)

我的代码工作,但寻找更高效的方法,而不是udf作为我的框架是巨大的,这个udf可能会使它效率较低。

  1. from pyspark.sql import functions as F
  2. df = spark.createDataFrame(
  3. [(1, 'a', 'A'),
  4. (2, 'a', 'B'),
  5. (3, 'a', 'C'),
  6. (4, 'b', 'A'),
  7. (5, 'b', 'B'),
  8. (6, 'b', 'C'),
  9. (7, 'c', 'A'),
  10. (8, 'c', 'B'),
  11. (9, 'c', 'C')],
  12. ['id', 'c1', 'c2']
  13. )
  14. from itertools import chain
  15. from collections import defaultdict
  16. custom_dict = {'c': 0,'a':1}
  17. # Define the custom order as a defaultdict
  18. my_Order = defaultdict(lambda: float('inf'))
  19. my_Order.update(custom_dict)
  20. # Create a UDF to get the custom order value for each value in the column
  21. get_order = F.udf(lambda x: my_Order[x])
  22. # Add a new column with the custom order value
  23. df = df.withColumn("order_value", get_order(F.col("c1")))
  24. # Order the DataFrame based on the custom order value
  25. df = df.orderBy("order_value")
  26. df.show()

字符串
我还尝试使用下面的而不是udf,但抛出错误,因为getItem不能与defaultdict一起使用

  1. df = df.withColumn("order_value", F.when(F.col("c1").isin(list(my_order.keys())), my_Order.getItem(F.col("c1")).otherwise(float('inf'))))

ztyzrc3y

ztyzrc3y1#

custom_dict转换为case语句,并将else部分作为默认值。

  1. stmt = "case c1 "
  2. for k,v in custom_dict.items():
  3. stmt = stmt + f" when '{k}' then {v}"
  4. stmt = stmt + " else cast('Infinity' as double) end"

字符串
测试结果:

  1. case c1 when 'c' then 0 when 'a' then 1 else cast('Infinity' as double) end


然后使用此语句添加一列,并对该列进行排序

  1. df.withColumn("order_value", F.expr(stmt))\
  2. .orderBy("order_value") \
  3. .show()


输出量:

  1. +---+---+---+-----------+
  2. | id| c1| c2|order_value|
  3. +---+---+---+-----------+
  4. | 7| c| A| 0.0|
  5. | 9| c| C| 0.0|
  6. | 8| c| B| 0.0|
  7. | 1| a| A| 1.0|
  8. | 3| a| C| 1.0|
  9. | 2| a| B| 1.0|
  10. | 4| b| A| Infinity|
  11. | 5| b| B| Infinity|
  12. | 6| b| C| Infinity|
  13. +---+---+---+-----------+

展开查看全部

相关问题