使用pyspark数据框中的2列作为查找来替换所有其他列

5hcedyr0  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(391)

我有一个数据集,它是一个监督层次结构,前两列是 id 以及 name 以下列是 level 1 , level 2 , level 3 等。。
在每个 level xx 是与id列相关的数字。

id     name         level 1   level 2   level 3
11     sup org 1    222       333       444
222    sup org 2    11        222       333 
333    sup org 3    11        222       333 
456    sup org 4    222       444       333

我要找的是

id     name         level 1     level 2     level 3
11     sup org 1    supr org 2  sup org 3   sup org 4
222    sup org 2    sup org 1   sup org 2   sup org 3 
333    sup org 3    sup org 1   sup org 2   sup org 3 
444    sup org 4    sup org 2   sup org 4   sup org 3

我试着用 rdd 函数,但我得到一个错误的函数不被列入白名单?
我试过以下方法:在哪里 sup_lookup 是上表的前两列 sup_org 是整张table吗

dict1 = [row.asDict() for row in sup_lookup.collect()]

mapping_expr = create_map([x for x in chain(*dict1.items())])

df = sup_org.withColumn('Level1', mapping_expr[sup_org['Level 1']]).withColumn('Level 2', mapping_expr[sup_org['Level2']]).withColumn('Level3', mapping_expr[sup_org['Level 2']])

但是我得到一个关于dict1列表没有attribute.items()的错误

kx1ctssn

kx1ctssn1#

可以对每个级别列执行自联接:

from pyspark.sql import functions as F

df1 = df.alias("df") \
    .join(df.alias("lvl1"), F.col("lvl1.id") == F.col("df.`level 1`"), "left") \
    .join(df.alias("lvl2"), F.col("lvl2.id") == F.col("df.`level 2`"), "left") \
    .join(df.alias("lvl3"), F.col("lvl3.id") == F.col("df.`level 3`"), "left") \
    .selectExpr("df.id", "df.name", "lvl1.name as  `level 1`", "lvl2.name as `level 2`", "lvl3.name as `level 3`")

df1.show()

# +---+---------+---------+---------+---------+

# | id|     name|  level 1|  level 2|  level 3|

# +---+---------+---------+---------+---------+

# |222|sup org 2|sup org 1|sup org 2|sup org 3|

# |333|sup org 3|sup org 1|sup org 2|sup org 3|

# |444|sup org 4|sup org 2|sup org 4|sup org 3|

# | 11|sup org 1|sup org 2|sup org 3|sup org 4|

# +---+---------+---------+---------+---------+
xurqigkl

xurqigkl2#

可以使用相关子查询从id获取相应的名称:

df.createOrReplaceTempView('df')
result = spark.sql("""
    select
        id,
        name,
        (select first(df2.name) from df as df2 where df1.`level 1` = df2.id) as `level 1`,
        (select first(df2.name) from df as df2 where df1.`level 2` = df2.id) as `level 2`,
        (select first(df2.name) from df as df2 where df1.`level 3` = df2.id) as `level 3`
    from df as df1
""")

result.show()
+---+---------+---------+---------+---------+
| id|     name|  level 1|  level 2|  level 3|
+---+---------+---------+---------+---------+
| 11|sup org 1|sup org 2|sup org 3|sup org 4|
|222|sup org 2|sup org 1|sup org 2|sup org 3|
|333|sup org 3|sup org 1|sup org 2|sup org 3|
|444|sup org 4|sup org 2|sup org 4|sup org 3|
+---+---------+---------+---------+---------+

相关问题