pyspark 我想比较两个具有相同列的点阵

w6mmgewl  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(154)

我想比较两个具有相同列的dfs,我只需要在df 1中添加一列last_change_date,它将通过df 2 last_change_date添加。
示例框:-

  1. data_df = [("John", 25, "Male", "Engineer", 1),
  2. ("Alice", 30, "Female", "Doctor", 2)]
  3. data_df2 = [("1","John", 25, "Male", "Engineer", "2023-01-01"),
  4. ("2","Alice", 30, "Female", "Doctor", "2023-01-02")]

字符串
我正在尝试这个代码:

  1. def compare_dataframes(df1, df2, key_fields):
  2. df1_aliases = [col(f"{field}").alias(f"{field}_df1") for field in df1.columns if field not in key_fields]
  3. df2_aliases = [col(f"{field}").alias(f"{field}_df2") for field in df2.columns if field not in key_fields]
  4. join_conditions = [col(f"{field}_df1") == col(f"{field}_df2") for field in key_fields]
  5. result_df = df1.select(*df1.columns, *df1_aliases).alias("df1").join(
  6. df2.select(*df2.columns, *df2_aliases).alias("df2"),
  7. on=join_conditions,
  8. how="inner"
  9. )
  10. for field in df1.columns:
  11. if field not in key_fields:
  12. result_df = result_df.withColumn(
  13. f"last_change_date_{field}",
  14. when(
  15. (col(f"{field}_df1") != col(f"{field}_df2")) | col(f"last_change_date_{field}_df1").isNull(),
  16. current_timestamp()
  17. ).otherwise(col(f"last_change_date_{field}_df2"))
  18. )
  19. return result_df


但我无法通过字段进行验证,我尝试使用别名,但它给出了错误:-

  1. [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Key_df1` cannot be resolved. Did you mean one of the following? [`df1`.`Age`, `df2`.`Age`, `df1`.`Age_df1`, `df1`.`Key`, `df2`.`Key`].;
  2. 'Join Inner, ('Key_df1 = 'Key_df2)
  3. :- SubqueryAlias df1
  4. : +- Project [Name#6424, Age#6425L, Key#6428L, Name#6424 AS Name_df1#6483, Age#6425L AS Age_df1#6484L]
  5. : +- Project [Name#6424, Age#6425L, Key#6428L]
  6. : +- LogicalRDD [Name#6424, Age#6425L, Gender#6426, Occupation#6427, Key#6428L], false
  7. +- SubqueryAlias df2
  8. +- Project [Key#6434, Name#6435, Age#6436L, last_change_date#6439, Name#6435 AS Name_df2#6485, Age#6436L AS Age_df2#6486L, last_change_date#6439 AS last_change_date_df2#6487]
  9. +- Project [Key#6434, Name#6435, Age#6436L, last_change_date#6439]
  10. +- LogicalRDD [Key#6434, Name#6435, Age#6436L, Gender#6437, Occupation#6438, last_change_date#6439], false.


我怎样才能得到想要的结果。

uplii1fm

uplii1fm1#

请看下面的答案是否有帮助。你可以在合并的df上运行这个列式迭代。简而言之,你不需要附加df1和df2别名。你可以直接用它们的父df引用它们。

  1. non_key_fields=[x for x in df1.columns if x not in key_fields]
  2. for cl in non_key_fields:
  3. final_df=final_df.withColumn("last_change_"+cl,when(((df1[cl]!=df2[cl])|(df1['last_change_date'].isNull())),current_timestamp()).otherwise(df2['last_change_date']))

字符串

yxyvkwin

yxyvkwin2#

如果你只是想在DF1中添加一个列,你可以简单地使用连接并更新DF来添加该列。这比迭代值并添加它要好得多。

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col
  3. # Create SparkSession
  4. spark = SparkSession.builder.appName("DataFrameComparison").getOrCreate()
  5. # Sample data
  6. data_df = [("John", 25, "Male", "Engineer", 1),
  7. ("Alice", 30, "Female", "Doctor", 2)]
  8. data_df2 = [("1", "John", 25, "Male", "Engineer", "2023-01-01"),
  9. ("2", "Alice", 30, "Female", "Doctor", "2023-01-02")]
  10. # Create DataFrames
  11. df1 = spark.createDataFrame(data_df, ["name", "age", "gender", "profession", "id"])
  12. df2 = spark.createDataFrame(data_df2, ["id", "name", "age", "gender", "profession", "last_change_date"])
  13. # Join the DataFrames on common columns
  14. df1 = df1.join(df2, (df1.id == df2.id) & (df1.name == df2.name)).select(df1["*"],df2["last_change_date"])
  15. df1.show()

字符串

结果

  1. +-----+---+------+----------+---+----------------+
  2. | name|age|gender|profession| id|last_change_date|
  3. +-----+---+------+----------+---+----------------+
  4. | John| 25| Male| Engineer| 1| 2023-01-01|
  5. |Alice| 30|Female| Doctor| 2| 2023-01-02|
  6. +-----+---+------+----------+---+----------------+

展开查看全部

相关问题