pyspark:如何过滤两个列值对的列表?

uujelgoq  于 2021-07-13  发布在  Spark
关注(0)|答案(3)|浏览(477)

所以我有一个pysparkDataframe,我想用两列的有效对的(长)列表来过滤它。
说我们的Dataframe的名字是 df 还有柱子 col1 以及 col2 :

col1   col2
1      A
2      B
3      1
null   2
A      null
2      null
1      null
B      C

我的有效配对列表如下: flist=[(1,A), (null,2), (1,null)] 当我试着 .isin() 函数(如下),它告诉我 .isin() 不是元组。

df.filter((df["col1"],df["col2"]).isin(flist))

通过连接两个字符串或为每一对写下一个布尔表达式,已经有了解决方法,但是我有一个很长的有效对列表(很难转换为布尔值),并且由于空值,连接也不可靠。使用python (df['col1'],df['col2']) in flist 也不起作用。
有没有一个pythonic/pysparkic方法可以做到这一点?

oknwwptz

oknwwptz1#

这里有一种不需要连接的方法,您可以在过滤器中链接一组条件,以便将每一行与中的值进行比较 flist . 它可以处理空值。

from functools import reduce
import pyspark.sql.functions as F

flist = [(1, 'A'), (None, 2), (1, None)] 

df2 = df.filter(
    reduce(
        lambda x, y: x | y, 
        [ 
            ((F.col('col1') == col1) if col1 is not None else F.col('col1').isNull()) & 
            ((F.col('col2') == col2) if col2 is not None else F.col('col2').isNull())
            for (col1, col2) in flist
        ]
    )
)

df2.show()
+----+----+
|col1|col2|
+----+----+
|   1|   A|
|null|   2|
|   1|null|
+----+----+
bxgwgixi

bxgwgixi2#

在@blackbishop的方法的基础上,您可以使用 Column.eqNullSafe 安全比较空值的方法:

df = spark.createDataFrame(
    [('1', 'A', 1),
     ('2', 'B', 2),
     ('3', '1', 3),
     (None, '2', 4),
     ('A', None, 5),
     ('2', None, 6),
     ('1', None, 7),
     ('B', 'C', 8)], schema=['col1', 'col2', 'col3'])

flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])

(df.join(filter_df,
         df["col1"].eqNullSafe(filter_df["col1"]) &
         df["col2"].eqNullSafe(filter_df['col2']))
 .select(df['col1'], df['col2'], df['col3'])
 .show())

给予:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|null|   7|
|null|   2|   4|
|   1|   A|   1|
+----+----+----+

请注意,如果“filter”数据框包含唯一的行,则连接只起到过滤器的作用。你可以加一个 distinct 在连接之前的Dataframe上确定(例如,如果您的筛选条件很大)。

db2dz4w8

db2dz4w83#

您可以创建 filder_df 使用列表并执行联接:

flist = [("1", "A"), (None, "2"), ("1", None)]
filter_df = spark.createDataFrame(flist, ["col1", "col2"])

df1 = df.join(filter_df, ["col1", "col2"])

df1.show()

# +----+----+

# |col1|col2|

# +----+----+

# |   1|   A|

# +----+----+

请注意,不能比较空值。所以只有行的元组 ("1", "A") 在这里返回。要检查空值,需要使用 isNull() 在列上:

df1 = df.alias("df").join(
    filter_df.alias("fdf"),
    ((F.col("df.col1") == F.col("fdf.col1")) |
     (col("df.col1").isNull() & F.col("fdf.col1").isNull())
     ) &
    ((F.col("df.col2") == F.col("fdf.col2")) |
     (col("df.col2").isNull() & F.col("fdf.col2").isNull())
     )
).select("df.*")

df1.show()

# +----+----+

# |col1|col2|

# +----+----+

# |   1|   A|

# |null|   2|

# |   1|null|

# +----+----+

或者更好的使用 eqNullSafe 正如@chris的回答所暗示的。

相关问题