pyspark 如果数据匹配,则根据另一个数据框删除行并插入新数据

czfnxgou  于 2022-11-28  发布在  Spark
关注(0)|答案(2)|浏览(164)

我有两个文件,一个是file1.csv,另一个是file2.csv。我已将file1数据放在一个 Dataframe 中,当第二个文件file2.csv到达时,我必须编写一个代码,如果第二个文件数据与第一个文件数据基于年和月列匹配,则从file1 Dataframe 中删除数据,因为它是旧数据,并在file1 Dataframe 中插入新的file2数据

文件1.csv

年月金额
2022年8月12日
2022年10月10日
2021年1月20日
2020年3月30日

文件2.csv

年月金额
二零二二年一月二二零
2022年2月130日
2022年10月100日

最终输出

年月金额
2022年8月12日
2022年10月100日
2021年1月20日
2020年3月30日
2022年2月130日
二零二二年一月二二零
我一直在尝试是否存在条件在pyspark,但它是不工作的

gdrx4gfi

gdrx4gfi1#

如果第二文件数据基于年和月列与第一文件数据匹配,则从文件1 Dataframe 中删除数据,因为它是旧数据,并在文件1 Dataframe 中插入新的文件2数据
您可以通过以下步骤完成此操作:

  • 执行LEFT联接,将file2.csv Dataframe 中的匹配行分配到file1.csv Dataframe 中,如果未找到匹配项,则分配NULL
  • file2.csv Dataframe 中的year-month对执行when-otherwise转换:如果要在最终输出中包含列为NOT NULL,则获取这些列,否则,获取file1.csv DataFrame中列

有关when-otherwise转换-https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/的更多信息

xzv2uavs

xzv2uavs2#

以下是我的2分:
1.从2个CSV文件创建2个 Dataframe (在我的情况下,我只是使用静态数据创建)

from pyspark.sql.functions import *
 from pyspark.sql.window import *

 data1 = [
 (2022, 'Aug', 12),
 (2022, 'Oct', 10),
 (2021, 'Jan', 20),
 (2020, 'March', 30)]

 data2 = [
 (2022, 'Jan', 220),
 (2022, 'Feb', 130),
 (2022, 'Oct', 100)]

 df_main = spark.createDataFrame(data1,schema = ['year', 'month', 'Amount'])
 df_incremental = spark.createDataFrame(data2,schema = ['year', 'month', 'Amount'])

1.然后在year和month的顶部使用row_number(),然后在计算后仅筛选row_number为1的行,然后删除row_number列。

df_merge = df_incremental.unionAll(df_main)

 windowSpec = Window.partitionBy('year', 'month').orderBy('year', 'month')
 df_merge = df_merge.withColumn("_row_number", row_number().over(windowSpec))
 df_merge = df_merge.where(df_merge._row_number == 1).drop("_row_number")

 df_merge.show()

请查看以下图片以供参考:

相关问题