newhadoopapi中的多个输入路径,用于spark读取lzo文件

xghobddn  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(644)

我正在开发一个spark应用程序,它必须从s3 bucket和hdfs读取多个目录(即多个路径)。我听说newhadoopapi提供了一种很好的方法,可以以一种性能良好的方式读取lzo压缩/索引文件。但是,如何使用newhadoopapi读取rdd中有多个lzo文件和索引文件的多个文件夹路径/目录?
文件夹结构类似于两列上的分区配置单元表。例:如下所示。按日期和批次划分
/rootdirectory/date=20161002/batch=5678/001\u 0.lzo/rootdirectory/date=20161002/batch=5678/001\u 0.lzo.index/rootdirectory/date=20161002/batch=5678/002\u 0.lzo/rootdirectory/date=20161002/batch=8765/001\u 0.lzo/rootdirectory/date=20161002/batch=8765/001\u 0.lzo.index/rootdirectory/date=20161002/batch=8765/002_0.lzo/rootdirectory/date=20161002/batch=8765/002_0.lzo.index
..... 等等。
现在我使用下面的代码从s3读取数据。这会将lzo和lzo.index文件都视为会使应用程序崩溃的输入,因为我不想读取.lzo.index文件,而只想读取使用索引的.lzo文件以提高速度。

val impInput = sparkSession.sparkContext.newAPIHadoopFile("s3://my-bucket/myfolder/*/*", classOf[NonSplittableTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
    val impRDD = impInput.map(_._2.toString)

有谁能帮我理解我该怎么做?
1). 使用newhadoopapi读取根目录下lzo文件的所有(多个)文件夹,这样我就可以利用.index文件了。
2). 以类似的方式从hdfs读取数据。

oxcyiej7

oxcyiej71#

为hdfs路径添加后缀可能会有所帮助。

val impInput = sparkSession.sparkContext.newAPIHadoopFile("s3://my-bucket/myfolder/*/*.lzo", classOf[NonSplittableTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])

相关问题