pyspark中是否有.any()等价物?

q9rjltbz  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(463)

我想知道有没有办法 .any() 在Pypark?
我用python编写了下面的代码,它基本上搜索一个子集dataframe中感兴趣的特定列,如果其中任何一列包含 "AD" ,我们不想处理它们。
下面是python中的代码:

index_list = [
    df.query("id == @id").index 
    for trial in unique_trial_id_list 
    if ~(df.query("id == @trial")["unit"].str.upper().str.contains("AD").any()]

下面是Pandas中的一个示例Dataframe。 ID=1 有绳子吗 'AD' 与之关联,因此我们希望将其从处理中排除。然而, ID=2 没有那个字符串 'AD' 与之关联,因此我们希望将其包含在进一步处理中。

data = [
    [1, "AD"],
    [1, "BC"],
    [1, "DE"],
    [1, "FG"],
    [2, "XY"],
    [2, "BC"],
    [2, "DE"],
    [2, "FG"],
]
df = pd.DataFrame(data, columns=["ID", "Code"])
df

问题是我不知道如何在pyspark中实现这个等价函数。我已经能够做一个列表理解子集,并已能够使用子集 contains('AD') 但是当我遇到困难的时候 any 事情的一部分。
我想出的Pypark代码:

id = id_list[0] 
test = sdf.select(["ID", "Codes"]).filter(spark_fns.col("ID") == id).filter(~spark_fns.col("Codes").str.contains("AD"))
2ekbmq32

2ekbmq321#

可以使用窗口函数(如果至少有一个真值,则布尔值的最大值为真):

from pyspark.sql import functions as F, Window

df1 = df.withColumn(
    "to_exclude",
    ~F.max(F.when(F.col("Code") == "AD", True).otherwise(False)).over(Window.partitionBy("ID"))
).filter(
    F.col("to_exclude")
).drop("to_exclude")

df1.show()

# +---+----+

# | ID|Code|

# +---+----+

# |  2|  XY|

# |  2|  BC|

# |  2|  DE|

# |  2|  FG|

# +---+----+

或groupby id 以及使用 max 功能与 when 表达式来筛选包含 ADCode 列,然后与原始df连接:

from pyspark.sql import functions as F

filter_df = df.groupBy("id").agg(
    F.max(F.when(F.col("Code") == "AD", True).otherwise(False)).alias("to_exclude")
).filter(F.col("to_exclude"))

df1 = df.join(filter_df, ["id"], "left_anti")

在spark 3+中,还有一个函数 any :

from pyspark.sql import functions as F

filter_df = df.groupBy("id").agg(
    F.expr("any(Code = 'AD')").alias("to_exclude")
).filter(F.col("to_exclude"))

df1 = df.join(filter_df, ["id"], "left_anti")
e4yzc0pl

e4yzc0pl2#

您也可以尝试这样做-1)查找代码列中包含ad的所有id值,2)通过左反联接筛选出具有此类id的行

df = pd.DataFrame(data, columns=["ID", "Code"])
df_psp = spark.createDataFrame(df)

cols = ["ID"]
df_filter = df_psp.filter(F.col("Code").like('%AD%')).select(cols).distinct()
df_out = df_psp.join(df_filter, cols, "left_anti")

相关问题