使用条件结果列连接pysparkDataframe

ryoqjall  于 2021-07-12  发布在  Spark
关注(0)|答案(4)|浏览(324)

我有这些table:

df1                  df2
+---+------------+   +---+---------+
| id|   many_cols|   | id|criterion|
+---+------------+   +---+---------+
|  1|lots_of_data|   |  1|    false|
|  2|lots_of_data|   |  1|     true|
|  3|lots_of_data|   |  1|     true|
+---+------------+   |  3|    false|
                     +---+---------+

我打算在 df1 :

+---+------------+------+
| id|   many_cols|result|
+---+------------+------+
|  1|lots_of_data|     1|
|  2|lots_of_data|  null|
|  3|lots_of_data|     0|
+---+------------+------+
``` `result` 应该是 `1` 如果有相应的 `true` 在
df2 `result` 应该是 `0` 如果没有相应的 `true` 在
df2 `result` 应该是 `null` 如果没有相应的 `id` 在 `df2` 我想不出一个有效的方法来做这件事。在加入后,我只能满足第三个条件:

df = df1.join(df2, 'id', 'full')
df.show()

+---+------------+---------+

| id| many_cols|criterion|

+---+------------+---------+

| 1|lots_of_data| false|

| 1|lots_of_data| true|

| 1|lots_of_data| true|

| 3|lots_of_data| false|

| 2|lots_of_data| null|

+---+------------+---------+

PyparkDataframe的创建方式如下:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

df1cols = ['id', 'many_cols']
df1data = [(1, 'lots_of_data'),
(2, 'lots_of_data'),
(3, 'lots_of_data')]
df2cols = ['id', 'criterion']
df2data = [(1, False),
(1, True),
(1, True),
(3, None)]
df1 = spark.createDataFrame(df1data, df1cols)
df2 = spark.createDataFrame(df2data, df2cols)

avkwfej4

avkwfej41#

一个简单的方法就是分组 df2 以获得最大值 criterionid 加入 df1 ,这样可以减少要连接的行数。如果至少有一个对应的真值,则布尔列的最大值为真:

from pyspark.sql import functions as F

df2_group = df2.groupBy("id").agg(F.max("criterion").alias("criterion"))

result = df1.join(df2_group, ["id"], "left").withColumn(
    "result",
    F.col("criterion").cast("int")
).drop("criterion")

result.show()

# +---+------------+------+

# | id|   many_cols|result|

# +---+------------+------+

# |  1|lots_of_data|     1|

# |  3|lots_of_data|     0|

# |  2|lots_of_data|  null|

# +---+------------+------+
cunj1qz1

cunj1qz12#

您可以尝试使用相关子查询从df2获取最大布尔值,并将其转换为整数。

df1.createOrReplaceTempView('df1') 
df2.createOrReplaceTempView('df2') 

df = spark.sql("""
    select
        df1.*,
        (select int(max(criterion)) from df2 where df1.id = df2.id) as result
    from df1
""")

df.show()
+---+------------+------+
| id|   many_cols|result|
+---+------------+------+
|  1|lots_of_data|     1|
|  3|lots_of_data|     0|
|  2|lots_of_data|  null|
+---+------------+------+
1zmg4dgp

1zmg4dgp3#

看看这个解决方案。加入后。您可以根据需要使用多个条件检查,并使用when子句相应地指定值,然后按id和其他列获取结果分组的最大值。如果只对分区使用just id,也可以使用window函数来计算结果的最大值。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df1cols = ['id', 'many_cols']
df1data = [(1, 'lots_of_data'),
           (2, 'lots_of_data'),
           (3, 'lots_of_data')]
df2cols = ['id', 'criterion']
df2data = [(1, False),
           (1, True),
           (1, True),
           (3, False)]
df1 = spark.createDataFrame(df1data, df1cols)
df2 = spark.createDataFrame(df2data, df2cols)

df2_mod =df2.withColumnRenamed("id", "id_2")

df3=df1.join(df2_mod, on=df1.id== df2_mod.id_2, how='left')

cond1 = (F.col("id")== F.col("id_2"))& (F.col("criterion")==1)
cond2 = (F.col("id")== F.col("id_2"))& (F.col("criterion")==0)
cond3 = (F.col("id_2").isNull())

df3.select("id", "many_cols", F.when(cond1, 1).when(cond2,0).when(cond3, F.lit(None)).alias("result"))\
    .groupBy("id", "many_cols").agg(F.max(F.col("result")).alias("result")).orderBy("id").show()

Result:
------

+---+------------+------+
| id|   many_cols|result|
+---+------------+------+
|  1|lots_of_data|     1|
|  2|lots_of_data|  null|
|  3|lots_of_data|     0|
+---+------------+------+

使用窗口函数

w=Window().partitionBy("id")

df3.select("id", "many_cols", F.when(cond1, 1).when(cond2,0).when(cond3, F.lit(None)).alias("result"))\
    .select("id", "many_cols", F.max("result").over(w).alias("result")).drop_duplicates().show()
jbose2ul

jbose2ul4#

为了得到最适合我的解决方案,我不得不把提出的答案的想法合并起来。


# The `cond` variable is very useful, here it represents several complex conditions

cond = F.col('criterion') == True
df2_grp = df2.select(
    'id',
    F.when(cond, 1).otherwise(0).alias('c')
).groupBy('id').agg(F.max(F.col('c')).alias('result'))
df = df1.join(df2_grp, 'id', 'left')

df.show()

# +---+------------+------+

# | id|   many_cols|result|

# +---+------------+------+

# |  1|lots_of_data|     1|

# |  3|lots_of_data|     0|

# |  2|lots_of_data|  null|

# +---+------------+------+

相关问题