在spark(hdfs)中写入csv文件要选择哪个选项?

x3naxklr  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(530)

我必须比较csv文件,然后我必须删除所有重复的行。所以,我的条件是,我有一个文件夹,我必须把每个筛选结果放在该文件夹中,当一些新文件来时,我必须将文件夹中现有的文件与新文件进行比较,最后,我必须将结果放回同一个文件夹。

eg: /data/ingestion/file1.csv

   a1 b1 c1

   a2 b2 c2

   a3 b3 c3

/data/ingestion/file2.csv

   a4 b4 c4

   a5 b5 c5

   a6 b6 c6

new upcoming file(upcoming_file.csv):

   a1 b1 c1

   a5 b5 c5

   a7 b7 c7

现在我的方法是从/data/ingestion/*中的所有文件创建一个Dataframe。然后创建一个即将到来的\u file.csv的Dataframe,并使用union操作附加这两个Dataframe。最后,应用离散变换。现在我必须把它写回/data/inspection,以确保不会出现重复。所以,我选择覆盖操作。

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion/")

最后我删除了文件夹/数据/摄取中的所有内容。即使是新的Dataframe也不能写入csv文件。
我也尝试过其他的选择,但是我没有达到我上面解释的效果!
提前谢谢!

mrphzbgm

mrphzbgm1#

我建议将输出写入hdfs上的新目录—在处理失败的情况下,您将始终能够丢弃已处理的内容,并使用原始数据从头开始处理—这既安全又简单。:)
当处理完成后-只需删除旧的一个和重命名新的一个旧的名称。
更新:

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion_tmp/")

   Configuration conf = new Configuration();
    conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
    FileSystem  hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf);
    hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive);
    hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");

这里是到hdfs文件系统api文档的链接

相关问题