如何在spark中写入csv

gpnt7bae  于 2021-06-03  发布在  Hadoop
关注(0)|答案(5)|浏览(1115)

我正在试图找到一种有效的方法来保存我的spark作业的结果为csv文件。我在hadoop中使用spark,到目前为止我所有的文件都保存为 part-00000 .
有没有办法让我的spark保存到指定文件名的文件?

z9smfwbn

z9smfwbn1#

我建议这样做(java示例):

theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
    fs, new Path(textFileName),
    fs, new Path(textFileNameDestiny),
    true, fs.getConf(), null);
mrzz3bfm

mrzz3bfm2#

我有一个想法,但还没有准备好代码片段。内部(顾名思义)spark使用hadoop输出格式(以及 InputFormat 从hdfs读取时)。
在hadoop的 FileOutputFormat 存在受保护的成员 setOutputFormat ,您可以从继承的类中调用它来设置其他基名称。

tjjdgumg

tjjdgumg3#

将如来的答案扩展到spark 2.x和scala 2.11

使用sparksql,我们可以在一行中完成这项工作

//implicits for magic functions like .toDf
import spark.implicits._

val df = Seq(
  ("first", 2.0),
  ("choose", 7.0),
  ("test", 1.5)
).toDF("name", "vals")

//write DataFrame/DataSet to external storage
df.write
  .format("csv")
  .save("csv/file/location")

然后你就可以直接回答阿多隆索的问题了。

lvmkulzt

lvmkulzt4#

由于spark使用hadoop文件系统api将数据写入文件,这是不可避免的。如果你这样做了

rdd.saveAsTextFile("foo")

它将另存为“ foo/part-XXXXX “使用一个part-*文件,您要保存的rdd中的每个分区。rdd中的每个分区被写入一个单独的文件的原因是为了容错。如果任务写入第三个分区(即 part-00002 )如果失败,spark只需重新运行任务并覆盖部分写入/损坏的 part-00002 ,对其他部位无影响。如果它们都写在同一个文件中,那么就很难恢复单个任务的失败。
这个 part-XXXXX 如果您打算在基于spark/hadoop的框架中再次使用它,文件通常不是问题,因为它们都使用hdfsapi,如果您要求它们读取“foo”,它们都将读取所有 part-XXXXX foo中的文件。

62o28rlo

62o28rlo5#

这不是一个真正干净的解决方案,但在一个 foreachRDD ()您基本上可以做任何您喜欢的事情,也可以创建一个新文件。
在我的解决方案中,我就是这样做的:我将输出保存在hdfs上(出于容错的原因),并保存在 foreachRDD 我还在本地文件夹中创建一个包含统计信息的tsv文件。
如果你需要的话,我想你也可以这么做。
http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-操作

相关问题