使用spark rdd保存和加载wholetextfiles

2lpgd968  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(481)

我需要在spark中执行一些文本文件的批处理。基本上有人给了我成吨的csv文件是畸形的。它们包含多行任意文本格式的标题数据,然后是多行格式正确的csv数据。我需要把这些数据分成两个文件,或者至少去掉头文件。
不管怎样,我读到你可以得到一个rdd格式:
[(文件名,内容)]
通过使用
spark.sparkcontext.wholetextfiles(输入文件\u csv)
然后我想在这个rdd上执行一个Map操作,这个操作会产生另一种与原始格式完全相同的格式
[(新文件名,内容)]
然后我希望集群将这些内容保存在这些文件名下。
我找不到可以为我执行此操作的write命令。我可以将rdd原始保存,但不能将其保存为普通文件,以后可以将其读取为Dataframe。
我想我可以删除标题,然后保存为一个单一的巨大csv文件名作为一个新的列,但我觉得这不会是有效的。
有人能解决我的问题吗?

k3fezbri

k3fezbri1#

这是scala,但在python中应该不会太远。在“foreach”中,我没有使用任何特定于spark的东西来编写文件,只是使用常规的hadoop api。

  1. sc.wholeTextFiles("/tmp/test-data/")
  2. .foreach{ x =>
  3. val filename = x._1
  4. val content = x._2
  5. val fs = FileSystem.get(new Configuration())
  6. val output = fs.create(new Path(s"${filename}-copy"))
  7. val writer = new PrintWriter(output)
  8. writer.write(content)
  9. writer.close
  10. }

相关问题