从azure hdfs读取Parquet文件

huwehgph  于 2021-08-25  发布在  Java
关注(0)|答案(0)|浏览(335)

我正在尝试使用hadoop从azure data lake读取Parquet文件。以下代码工作正常(在java中):

  1. URI uri =
  2. new URI("abfss://my-container@my-azure-account.blob.core.windows.net/some-path/2021/07/06/04-00-50/"
  3. + "1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet");
  4. HadoopInputFile file = HadoopInputFile.fromPath(new Path(uri), config);
  5. ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
  6. GenericRecord record;
  7. while ((record = reader.read()) != null) {
  8. System.out.println(record);
  9. }
  10. reader.close();

但是,如果我将url中的“blob”替换为“dfs”,我会得到一个异常,它会说“不是Parquet地板文件”。尾部应为幻数“:

  1. java.lang.RuntimeException: abfss://my-container@my-azure-account.dfs.core.windows.net/some-path/2021/07/06/04-00-50/1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet is not a Parquet file. Expected magic number at tail, but found [21, 0, 21, -82]
  2. at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:556)
  3. at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
  4. at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  5. at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:152)
  6. at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)

唯一的区别是地址中的dfs:abfss://my-container@my-azure-account.dfs.core.windows.net/some-path/2021/07/06/04-00-50/1.parquet/part-00000-360855b5-58c4-40ed-989f-a81c797cb008-c000.snappy.parquet。
我很确定这是一个基本的问题,但我对hadoop还不熟悉,所以我可能会忽略这一点。我做错了什么?
为了澄清,我之所以要使用dfs而不是blob url,是因为我需要读取给定日期的文件,但我不知道包含时间和文件名(可能也会有所不同)的路径的其余部分。为了找到它,我使用文件系统上的listfiles方法。完整代码如下所示:

  1. URI uri = new URI("abfss://my-container@my-azure-account.blob.core.windows.net/some-path/2021/06/08");
  2. FileSystem fs = path.getFileSystem(config);
  3. RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(uri), true);
  4. while (it.hasNext()) {
  5. LocatedFileStatus fileStatus = it.next();
  6. HadoopInputFile file = HadoopInputFile.fromStatus(fileStatus, config);
  7. ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
  8. GenericRecord record;
  9. while ((record = reader.read()) != null) {
  10. System.out.println(record);
  11. }
  12. reader.close();
  13. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题