为什么我在我的pyspark查询中得到列模糊错误?

zxlwwiss  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(171)

我试图创建一个列,用于标识哪些列作为变更数据馈送的一部分进行了更新(我遵循了以下堆栈溢出:Compare two dataframes Pyspark),但是当我引用自己的表时,我收到了以下错误:

  1. AnalysisException:Column _commit_version#203599L, subscribe_status#203595, _change_type#203598, _commit_timestamp#203600, subscribe_dt#203596, end_sub_dt#203597 are ambiguous.

字符串
这可能是因为您将多个数据集联接在一起,而其中一些数据集是相同的。此列指向其中一个数据集,但Spark无法确定是哪个数据集。请在联接数据集之前通过Dataset.as使用不同名称的数据集的别名,并使用限定名指定列,例如:
df.as("a").join(df.as("b"), $"a.id" > $"b.id")的值。“
也可以将spark.sql.analyzer.failAmbiguousSelfJoin设置为false以禁用此检查。
下面是我的代码:

  1. df_X = df1.filter(df1['_change_type'] == 'update_preimage')
  2. df_Y = df1.filter(df1['_change_type'] == 'update_postimage')
  3. dfX.show()
  4. dfY.show()
  5. from pyspark.sql.functions import col, array, lit, when, array_remove
  6. # get conditions for all columns except id
  7. # conditions_ = [when(dfX[c]!=dfY[c], lit(c)).otherwise("") for c in dfX.columns if c != ['external_id', '_change_type']]
  8. select_expr =[
  9. col("external_id"),
  10. *[dfY[c] for c in dfY.columns if c != 'external_id'],
  11. # array_remove(array(*conditions_), "").alias("column_names")
  12. ]
  13. print(select_expr)
  14. dfX.join(dfY, "external_id").select(*select_expr).show()```

wh6knrhe

wh6knrhe1#

我得到了它,我给我的初始变量加了别名,它起作用了:

  1. dfY = df1.filter(df1['_change_type'] == 'update_postimage').alias('y')
  2. dfX.show()
  3. dfY.show()
  4. from pyspark.sql.functions import col, array, lit, when, array_remove
  5. # get conditions for all columns except id
  6. conditions_ = [
  7. when(col("x." + c) != col("y." + c), lit(c)).otherwise("").alias("condition_" + c)
  8. for c in dfX.columns if c not in ['external_id', '_change_type']
  9. ]
  10. select_expr =[
  11. col("external_id"),
  12. *[col("y." + c).alias("y_" + c) for c in dfY.columns if c != 'external_id'],
  13. array_remove(array(*conditions_), "").alias("column_names")
  14. ]
  15. display(dfX.join(dfY, "external_id").select(*select_expr))

字符串

展开查看全部

相关问题