我的代码工作,但寻找更高效的方法,而不是udf作为我的框架是巨大的,这个udf可能会使它效率较低。
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'a', 'A'),
(2, 'a', 'B'),
(3, 'a', 'C'),
(4, 'b', 'A'),
(5, 'b', 'B'),
(6, 'b', 'C'),
(7, 'c', 'A'),
(8, 'c', 'B'),
(9, 'c', 'C')],
['id', 'c1', 'c2']
)
from itertools import chain
from collections import defaultdict
custom_dict = {'c': 0,'a':1}
# Define the custom order as a defaultdict
my_Order = defaultdict(lambda: float('inf'))
my_Order.update(custom_dict)
# Create a UDF to get the custom order value for each value in the column
get_order = F.udf(lambda x: my_Order[x])
# Add a new column with the custom order value
df = df.withColumn("order_value", get_order(F.col("c1")))
# Order the DataFrame based on the custom order value
df = df.orderBy("order_value")
df.show()
字符串
我还尝试使用下面的而不是udf,但抛出错误,因为getItem不能与defaultdict一起使用
df = df.withColumn("order_value", F.when(F.col("c1").isin(list(my_order.keys())), my_Order.getItem(F.col("c1")).otherwise(float('inf'))))
型
1条答案
按热度按时间ztyzrc3y1#
将
custom_dict
转换为case语句,并将else
部分作为默认值。字符串
测试结果:
型
然后使用此语句添加一列,并对该列进行排序
型
输出量:
型