从azure hdfs读取Parquet文件

huwehgph  于 2021-08-25  发布在  Java
关注(0)|答案(0)|浏览(318)

我正在尝试使用hadoop从azure data lake读取Parquet文件。以下代码工作正常(在java中):

URI uri =
        new URI("abfss://my-container@my-azure-account.blob.core.windows.net/some-path/2021/07/06/04-00-50/"
                        + "1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet");
HadoopInputFile file = HadoopInputFile.fromPath(new Path(uri), config);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord record;
while ((record = reader.read()) != null) {
    System.out.println(record);
}
reader.close();

但是,如果我将url中的“blob”替换为“dfs”,我会得到一个异常,它会说“不是Parquet地板文件”。尾部应为幻数“:

java.lang.RuntimeException: abfss://my-container@my-azure-account.dfs.core.windows.net/some-path/2021/07/06/04-00-50/1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet is not a Parquet file. Expected magic number at tail, but found [21, 0, 21, -82]
                at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:556)
                at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
                at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
                at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:152)
                at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)

唯一的区别是地址中的dfs:abfss://my-container@my-azure-account.dfs.core.windows.net/some-path/2021/07/06/04-00-50/1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet。
我很确定这是一个基本的问题,但我对hadoop还不熟悉,所以我可能会忽略这一点。我做错了什么?
为了澄清,我之所以要使用dfs而不是blob url,是因为我需要读取给定日期的文件,但我不知道包含时间和文件名(可能也会有所不同)的路径的其余部分。为了找到它,我使用文件系统上的listfiles方法。完整代码如下所示:

URI uri = new URI("abfss://my-container@my-azure-account.blob.core.windows.net/some-path/2021/06/08");
FileSystem fs = path.getFileSystem(config);
RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(uri), true);
while (it.hasNext()) {
    LocatedFileStatus fileStatus = it.next();
    HadoopInputFile file = HadoopInputFile.fromStatus(fileStatus, config);
    ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
    GenericRecord record;
    while ((record = reader.read()) != null) {
        System.out.println(record);
    }
    reader.close();
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题