有没有办法在Pyspark中使用map/dict来避免CASE WHEN条件等于对?

wgx48brx  于 2023-02-16  发布在  Apache
关注(0)|答案(2)|浏览(103)

我在Pyspark中遇到了一个问题,它基于另一个列中的值为新的 Dataframe 创建一个列。这很无聊,在我看来,使用大量的

CASE
  WHEN column_a = 'value_1' THEN 'value_x'
  WHEN column_a = 'value_2' THEN 'value_y'
  ...  
  WHEN column_a = 'value_289' THEN 'value_xwerwz'
END

在这种情况下,在python中,我习惯于使用一个dict或者更好的一个configparser文件来避免if else条件,我只需要传递键,python就会返回想要的值,而且,我们为ELSE子句提供了一个“fallback”选项。
在我看来,问题是我们不是在一个命令中处理一行,而是所有行,因此使用dict/map/configparser是一个不可用的选项。我曾想过使用dict循环,但它似乎太慢,而且在重复所有条件时浪费计算。
我还在寻找这种做法,如果我找到了,我会把它贴在这里。但是,你知道,可能很多人已经使用它,我还不知道。但如果没有其他方法,好吧。使用许多当那么条件将不是一个选择。
谢谢
我尝试使用dict并搜索了如下解决方案

v440hwme

v440hwme1#

一种选择是使用字典创建 Dataframe 并执行连接
这是可行的:
创建 Dataframe :

dict={"value_1": "value_x", "value_2": "value_y"}
dict_df=spark.createDataFrame([(k,v) for k,v in dict.items()], ["key","value"])

执行联接:

df.alias("df1")\
.join(F.broadcast(dict_df.alias("df2")), F.col("column_a")==F.col("key"))\
.selectExpr("df1.*","df2.value as newColumn")\
.show()

我们可以广播dict_df,因为它很小。
输入:

字典_df:

输出:

或者,可以使用UDF-但不建议这样做。

ma8fv8wu

ma8fv8wu2#

你可以创建一个函数,将一个dict转换成一个Spark F.,例如:

import pyspark.sql.functions as F

def create_spark_when(column, conditions, default):
    when = None
    for key, value in conditions.items():
        current_when = F.when(F.col(column) == key, value)
        if when is None:
            when = current_when.otherwise(default)
        else:
            when = current_when.otherwise(when)
    return when

df = spark.createDataFrame([(0,), (1,), (2,)])
df.show()
my_conditions = {1: "a", 2: "b"}
my_default = "c"
df.withColumn(
    "my_column",
    create_spark_when("_1", my_conditions, my_default),
).show()

输出:

+---+
| _1|
+---+
|  0|
|  1|
|  2|
+---+

+---+---------+
| _1|my_column|
+---+---------+
|  0|        c|
|  1|        a|
|  2|        b|
+---+---------+

相关问题