我想在aws中创建一个数据处理管道,最终将处理后的数据用于机器学习。
我有一个scala脚本,它从s3获取原始数据,对其进行处理并将其写入hdfs,甚至使用sparkcsv将其写入s3。如果我想使用aws机器学习工具来训练预测模型,我想我可以使用多个文件作为输入。但是如果我想使用其他东西,我认为最好是接收一个csv输出文件。
目前,由于我不想为了性能目的而使用重分区(1)或合并(1),我已经使用hadoop fs-getmerge进行手动测试,但是由于它只是合并作业输出文件的内容,所以我遇到了一个小问题。我需要在数据文件中的一行标题来训练预测模型。
如果我使用 .option("header","true")
对于spark csv,它将头文件写入每个输出文件,合并后,数据中的头文件行数与输出文件中的头文件行数相同。但如果header选项为false,则不会添加任何头。
现在我找到了一个将scala脚本中的文件与hadoopapi合并的选项 FileUtil.copyMerge
. 我试过这个 spark-shell
代码如下。
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
但是这个解决方案仍然只是将文件连接在一起,而不处理头文件。如何获得只有一行标题的输出文件?
我甚至试着添加 df.columns.mkString(",")
作为最后的论据 copyMerge
,但这仍然添加了多次头,而不是一次。
6条答案
按热度按时间axkjgtzd1#
使用dataframe.schema(val header=datadf.schema.fieldnames.reduce(“+”,“+”)
在dsefs上创建一个头文件
使用hadoop文件系统api将所有分区文件(无头)附加到#2中的文件
qzlgjiam2#
你可以这样走来走去。
1.创建包含标头名称的新Dataframe(headerdf)。
2.将其与包含数据的Dataframe(datadf)合并。
3.使用选项(“header”,“false”)将union-edDataframe输出到磁盘。
4.使用hadoop fileutil合并分区文件(part-0000**0.csv)
通过这种方式,除了单个分区的内容之外,所有分区都没有头,头df中有一行头名称。当所有分区合并在一起时,文件的顶部只有一个头。示例代码如下
pu3pd22g3#
尝试指定头的架构,并使用spark csv的drop malformed选项从文件夹中读取所有文件。这应该允许您读取文件夹中的所有文件,只保留标题(因为您删除了格式错误的文件)。例子:
在header_df中,您将只有头的行,从中您可以按照您需要的方式转换Dataframe。
mbjcgjjk4#
要将文件夹中的文件合并到一个文件中,请执行以下操作:
如果要将所有文件合并到一个文件中,但仍在同一文件夹中(但这会将所有数据带到驱动程序节点):
另一种解决方案是使用解决方案#2,然后将文件夹中的一个文件移动到另一个路径(使用我们的csv文件名)。
iq0todco5#
我们遇到了一个类似的问题,按照下面的方法获取单个输出文件-
将Dataframe写入带标头的hdfs,而不使用
coalesce
或者repartition
(转换后)。读取上一步中的文件,并使用
coalesce(1)
.这样,您将避免在执行转换时出现与合并或重新分区相关的性能问题(步骤1)。第二步提供带有一个标题行的单个输出文件。
u91tlkcl6#
请按照集成测试的链接了解如何编写单个头
http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/