我正在尝试使用hadoop从azure data lake读取Parquet文件。以下代码工作正常(在java中):
URI uri =
new URI("abfss://my-container@my-azure-account.blob.core.windows.net/some-path/2021/07/06/04-00-50/"
+ "1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet");
HadoopInputFile file = HadoopInputFile.fromPath(new Path(uri), config);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord record;
while ((record = reader.read()) != null) {
System.out.println(record);
}
reader.close();
但是,如果我将url中的“blob”替换为“dfs”,我会得到一个异常,它会说“不是Parquet地板文件”。尾部应为幻数“:
java.lang.RuntimeException: abfss://my-container@my-azure-account.dfs.core.windows.net/some-path/2021/07/06/04-00-50/1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet is not a Parquet file. Expected magic number at tail, but found [21, 0, 21, -82]
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:556)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:152)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
唯一的区别是地址中的dfs:abfss://my-container@my-azure-account.dfs.core.windows.net/some-path/2021/07/06/04-00-50/1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet。
我很确定这是一个基本的问题,但我对hadoop还不熟悉,所以我可能会忽略这一点。我做错了什么?
为了澄清,我之所以要使用dfs而不是blob url,是因为我需要读取给定日期的文件,但我不知道包含时间和文件名(可能也会有所不同)的路径的其余部分。为了找到它,我使用文件系统上的listfiles方法。完整代码如下所示:
URI uri = new URI("abfss://my-container@my-azure-account.blob.core.windows.net/some-path/2021/06/08");
FileSystem fs = path.getFileSystem(config);
RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(uri), true);
while (it.hasNext()) {
LocatedFileStatus fileStatus = it.next();
HadoopInputFile file = HadoopInputFile.fromStatus(fileStatus, config);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord record;
while ((record = reader.read()) != null) {
System.out.println(record);
}
reader.close();
}
暂无答案!
目前还没有任何答案,快来回答吧!