sparkDataframe并行

quhf5bfb  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(358)

下面是我使用apachespark的用例
1) 我有大约2500个Parquet文件在hdfs上,文件大小因文件而异。
2) 我需要处理每一个Parquet文件,建立一个新的Dataframe,并将一个新的Dataframe写入orc文件格式。
3) 我的spark驱动程序是这样的。我迭代每个文件,处理单个Parquet文件,创建一个新的Dataframe,并将一个新的Dataframe编写为orc,下面是代码片段。

val fs = FileSystem.get(new Configuration())
  val parquetDFMap = fs.listStatus(new Path(inputFilePath)).map(folder => {
  (folder.getPath.toString, sqlContext.read.parquet(folder.getPath.toString))})

parquetDFMap.foreach {
  dfMap =>
    val parquetFileName = dfMap._1
    val parqFileDataFrame = dfMap._2
    for (column <- parqFileDataFrame.columns) 
    {
       val rows = parqFileDataFrame.select(column)
            .mapPartitions(lines => lines.filter(filterRowsWithNullValues(_))
            .map(row => buildRowRecords(row, masterStructArr.toArray, valuesArr)))
        val newDataFrame: DataFrame = parqFileDataFrame.sqlContext.createDataFrame(rows, StructType(masterStructArr))
       newDataFrame.write.mode(SaveMode.Append).format("orc").save(orcOutPutFilePath+tableName)
    }
}

这个设计的问题是我只能及时处理一个Parquet文件,只有当我创建一个新的Dataframe,并且当新的Dataframe被写入orc格式时,才会应用并行性。因此,如果创建新的Dataframe或将新的Dataframe写入orc等任务需要很长时间才能完成其他排列的Parquet地板处理,那么在当前Parquet地板操作完成之前,这些任务都会被卡住。
你能帮我为这个用例提供一个更好的方法或设计吗。

kdfy810k

kdfy810k1#

我能够通过并行 parquetDFMap.foreach.par

nukf8bse

nukf8bse2#

您能为所有Parquet文件创建一个Dataframe而不是为每个文件创建一个Dataframe吗

val df =  sqlContext.read.parquet(inputFilePath)
df.map(row => convertToORc(row))

相关问题