我必须比较csv文件,然后我必须删除所有重复的行。所以,我的条件是,我有一个文件夹,我必须把每个筛选结果放在该文件夹中,当一些新文件来时,我必须将文件夹中现有的文件与新文件进行比较,最后,我必须将结果放回同一个文件夹。
eg: /data/ingestion/file1.csv
a1 b1 c1
a2 b2 c2
a3 b3 c3
/data/ingestion/file2.csv
a4 b4 c4
a5 b5 c5
a6 b6 c6
new upcoming file(upcoming_file.csv):
a1 b1 c1
a5 b5 c5
a7 b7 c7
现在我的方法是从/data/ingestion/*中的所有文件创建一个Dataframe。然后创建一个即将到来的\u file.csv的Dataframe,并使用union操作附加这两个Dataframe。最后,应用离散变换。现在我必须把它写回/data/inspection,以确保不会出现重复。所以,我选择覆盖操作。
deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion/")
最后我删除了文件夹/数据/摄取中的所有内容。即使是新的Dataframe也不能写入csv文件。
我也尝试过其他的选择,但是我没有达到我上面解释的效果!
提前谢谢!
1条答案
按热度按时间mrphzbgm1#
我建议将输出写入hdfs上的新目录—在处理失败的情况下,您将始终能够丢弃已处理的内容,并使用原始数据从头开始处理—这既安全又简单。:)
当处理完成后-只需删除旧的一个和重命名新的一个旧的名称。
更新:
这里是到hdfs文件系统api文档的链接