我试图创建一个列,用于标识哪些列作为变更数据馈送的一部分进行了更新(我遵循了以下堆栈溢出:Compare two dataframes Pyspark),但是当我引用自己的表时,我收到了以下错误:
AnalysisException:Column _commit_version#203599L, subscribe_status#203595, _change_type#203598, _commit_timestamp#203600, subscribe_dt#203596, end_sub_dt#203597 are ambiguous.
字符串
这可能是因为您将多个数据集联接在一起,而其中一些数据集是相同的。此列指向其中一个数据集,但Spark无法确定是哪个数据集。请在联接数据集之前通过Dataset.as
使用不同名称的数据集的别名,并使用限定名指定列,例如:df.as("a").join(df.as("b"), $"a.id" > $"b.id")
的值。“
也可以将spark.sql.analyzer.failAmbiguousSelfJoin
设置为false以禁用此检查。
下面是我的代码:
df_X = df1.filter(df1['_change_type'] == 'update_preimage')
df_Y = df1.filter(df1['_change_type'] == 'update_postimage')
dfX.show()
dfY.show()
from pyspark.sql.functions import col, array, lit, when, array_remove
# get conditions for all columns except id
# conditions_ = [when(dfX[c]!=dfY[c], lit(c)).otherwise("") for c in dfX.columns if c != ['external_id', '_change_type']]
select_expr =[
col("external_id"),
*[dfY[c] for c in dfY.columns if c != 'external_id'],
# array_remove(array(*conditions_), "").alias("column_names")
]
print(select_expr)
dfX.join(dfY, "external_id").select(*select_expr).show()```
型
1条答案
按热度按时间wh6knrhe1#
我得到了它,我给我的初始变量加了别名,它起作用了:
字符串