将spark输出csv文件与单个头合并

cld4siwp  于 2021-06-02  发布在  Hadoop
关注(0)|答案(6)|浏览(600)

我想在aws中创建一个数据处理管道,最终将处理后的数据用于机器学习。
我有一个scala脚本,它从s3获取原始数据,对其进行处理并将其写入hdfs,甚至使用sparkcsv将其写入s3。如果我想使用aws机器学习工具来训练预测模型,我想我可以使用多个文件作为输入。但是如果我想使用其他东西,我认为最好是接收一个csv输出文件。
目前,由于我不想为了性能目的而使用重分区(1)或合并(1),我已经使用hadoop fs-getmerge进行手动测试,但是由于它只是合并作业输出文件的内容,所以我遇到了一个小问题。我需要在数据文件中的一行标题来训练预测模型。
如果我使用 .option("header","true") 对于spark csv,它将头文件写入每个输出文件,合并后,数据中的头文件行数与输出文件中的头文件行数相同。但如果header选项为false,则不会添加任何头。
现在我找到了一个将scala脚本中的文件与hadoopapi合并的选项 FileUtil.copyMerge . 我试过这个 spark-shell 代码如下。

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

但是这个解决方案仍然只是将文件连接在一起,而不处理头文件。如何获得只有一行标题的输出文件?
我甚至试着添加 df.columns.mkString(",") 作为最后的论据 copyMerge ,但这仍然添加了多次头,而不是一次。

axkjgtzd

axkjgtzd1#

使用dataframe.schema(val header=datadf.schema.fieldnames.reduce(“+”,“+”)
在dsefs上创建一个头文件
使用hadoop文件系统api将所有分区文件(无头)附加到#2中的文件

qzlgjiam

qzlgjiam2#

你可以这样走来走去。
1.创建包含标头名称的新Dataframe(headerdf)。
2.将其与包含数据的Dataframe(datadf)合并。
3.使用选项(“header”,“false”)将union-edDataframe输出到磁盘。
4.使用hadoop fileutil合并分区文件(part-0000**0.csv)
通过这种方式,除了单个分区的内容之外,所有分区都没有头,头df中有一行头名称。当所有分区合并在一起时,文件的顶部只有一个头。示例代码如下

//dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
pu3pd22g

pu3pd22g3#

尝试指定头的架构,并使用spark csv的drop malformed选项从文件夹中读取所有文件。这应该允许您读取文件夹中的所有文件,只保留标题(因为您删除了格式错误的文件)。例子:

val headerSchema = List(
  StructField("example1", StringType, true),
  StructField("example2", StringType, true),
  StructField("example3", StringType, true)
)

val header_DF =sqlCtx.read
  .option("delimiter", ",")
  .option("header", "false")
  .option("mode","DROPMALFORMED")
  .option("inferSchema","false")
  .schema(StructType(headerSchema))
  .format("com.databricks.spark.csv")
  .load("folder containg the files")

在header_df中,您将只有头的行,从中您可以按照您需要的方式转换Dataframe。

mbjcgjjk

mbjcgjjk4#

要将文件夹中的文件合并到一个文件中,请执行以下操作:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

如果要将所有文件合并到一个文件中,但仍在同一文件夹中(但这会将所有数据带到驱动程序节点):

dataFrame
      .coalesce(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save(out)

另一种解决方案是使用解决方案#2,然后将文件夹中的一个文件移动到另一个路径(使用我们的csv文件名)。

def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpDir = "tmpDir"

    df.repartition(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", header.toString)
      .option("delimiter", sep)
      .save(tmpDir)

    val dir = new File(tmpDir)
    val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"
    (new File(tmpCsvFile)).renameTo(new File(fileName))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
}
iq0todco

iq0todco5#

我们遇到了一个类似的问题,按照下面的方法获取单个输出文件-
将Dataframe写入带标头的hdfs,而不使用 coalesce 或者 repartition (转换后)。

dataframe.write.format("csv").option("header", "true").save(hdfs_path_for_multiple_files)

读取上一步中的文件,并使用 coalesce(1) .

dataframe = spark.read.option('header', 'true').csv(hdfs_path_for_multiple_files)

dataframe.coalesce(1).write.format('csv').option('header', 'true').save(hdfs_path_for_single_file)

这样,您将避免在执行转换时出现与合并或重新分区相关的性能问题(步骤1)。第二步提供带有一个标题行的单个输出文件。

u91tlkcl

u91tlkcl6#

// Convert JavaRDD  to CSV and save as text file
        outputDataframe.write()
                .format("com.databricks.spark.csv")
                // Header => true, will enable to have header in each file
                .option("header", "true")

请按照集成测试的链接了解如何编写单个头
http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/

相关问题