pysparkDataframe与给定的主键列表连接

2o7dmzc5  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(382)

我怀疑这是否可能。
让我们谈谈我的要求,我有 tableA 具有多个主键。 primary_key: ['user_id', 'role_id'] 像这样,多个表中有2个以上的pk,所有pk都在json中定义,如下所示。

{  "sourcetable": "app_setting",
   "schema": "dbo",
    "primarykey": [
                "application_code",
                "region_code",
                "country_code",
                "app_setting_key",
                "app_setting_value"
    ]
}

在同一张表上,我定义了两个Dataframe,

Df1 = spark.read.parquet(tableA)    # complete table
df2 = Df1.filter((df1.user_id == 1) & (df1.user_id==1)) # df2 is filter Df

现在我想加入这些df1和df2

join_Df= Df1.join(df2 , df2[primary_key] == Df1["primary_key"], "inner")

但我有个错误:
在join assert isinstance(on[0],column)中,“on should be column or list of column”assertionerror:on should be column or list of column
这个连接可以与pks列表连接吗?

vu8f3i0k

vu8f3i0k1#

我想你的意思是复合主键而不是多个pk。
只需将主键作为连接条件中的字符串列表传递:

table_info = {
        "sourcetable": "app_setting", "schema": "dbo",
        "primarykey": ["application_code", "region_code", "country_code", "app_setting_key", "app_setting_value"]
    }

df_result = df1.alias("DF1").join(df2.alias("DF2"), table_info["primarykey"])

或者,如果您喜欢使用列,您可以遍历组成pk的列名列表,并创建如下连接条件:

from functools import reduce
from pyspark.sql import functions as F

join_cols = [
    F.col(f"DF1.{c}") == F.col(f"DF2.{c}")
    for c in table_info["primarykey"]
]

df_result = df1.alias("DF1").join(
    df2.alias("DF2"),
    reduce(lambda acc, x: acc & x, join_cols),
    "inner"
)

相关问题