pyspark自定义排序,部分值已知,比udf更有效

vaj7vani  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(173)

我的代码工作,但寻找更高效的方法,而不是udf作为我的框架是巨大的,这个udf可能会使它效率较低。

from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'a', 'A'),
 (2, 'a', 'B'),
 (3, 'a', 'C'),
 (4, 'b', 'A'),
 (5, 'b', 'B'),
 (6, 'b', 'C'),
 (7, 'c', 'A'),
 (8, 'c', 'B'),
 (9, 'c', 'C')],
['id', 'c1', 'c2']
)

from itertools import chain

from collections import defaultdict

custom_dict = {'c': 0,'a':1}
# Define the custom order as a defaultdict
my_Order = defaultdict(lambda: float('inf'))
my_Order.update(custom_dict)

# Create a UDF to get the custom order value for each value in the column
get_order = F.udf(lambda x: my_Order[x])

# Add a new column with the custom order value
df = df.withColumn("order_value", get_order(F.col("c1")))

# Order the DataFrame based on the custom order value
df = df.orderBy("order_value")
df.show()

字符串
我还尝试使用下面的而不是udf,但抛出错误,因为getItem不能与defaultdict一起使用

df = df.withColumn("order_value", F.when(F.col("c1").isin(list(my_order.keys())), my_Order.getItem(F.col("c1")).otherwise(float('inf'))))

ztyzrc3y

ztyzrc3y1#

custom_dict转换为case语句,并将else部分作为默认值。

stmt = "case c1 "
for k,v in custom_dict.items():
    stmt = stmt + f" when '{k}' then {v}"
stmt = stmt + " else cast('Infinity' as double) end"

字符串
测试结果:

case c1  when 'c' then 0 when 'a' then 1 else cast('Infinity' as double) end


然后使用此语句添加一列,并对该列进行排序

df.withColumn("order_value", F.expr(stmt))\
  .orderBy("order_value") \
  .show()


输出量:

+---+---+---+-----------+
| id| c1| c2|order_value|
+---+---+---+-----------+
|  7|  c|  A|        0.0|
|  9|  c|  C|        0.0|
|  8|  c|  B|        0.0|
|  1|  a|  A|        1.0|
|  3|  a|  C|        1.0|
|  2|  a|  B|        1.0|
|  4|  b|  A|   Infinity|
|  5|  b|  B|   Infinity|
|  6|  b|  C|   Infinity|
+---+---+---+-----------+

相关问题