Python:如何从HDFS导入目录中的文件列表

kb5ga3dv  于 2023-05-27  发布在  HDFS
关注(0)|答案(2)|浏览(275)

我尝试在python中从HDFS导入文件列表。
如何从HDFS做到这一点:

path =r'/my_path'
allFiles = glob.glob(path + "/*.csv")

df_list = []
for file_ in allFiles:
    df = pd.read_csv(file_,index_col=None, header=0,sep=';')    
    df_list.append(df)

我认为subprocess.Popen可以做到这一点,但是如何只提取文件名呢?

import subprocess
p = subprocess.Popen("hdfs dfs -ls /my_path/ ",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT)

for line in p.stdout.readlines():
    print(line)

输出如下:

b'Found 32 items\n'
b'-rw-------   3 user hdfs   42202621 2019-01-21 10:05 /my_path/file1.csv\n'
b'-rw-------   3 user hdfs   99320020 2019-01-21 10:05 /my_path/file2.csv\n'
os8fio9y

os8fio9y1#

声明人:这将是一个漫长而乏味的。但考虑到这种情况,我将尽可能使它具有通用性和可重复性。

考虑到没有外部库的要求(除了pandas?),没有必须的选择。我建议尽可能多地使用WebHDFS
AFAIK,HDFS 的安装默认包括 WebHDFS 的安装。以下解决方案严重依赖于 WebHDFS

第一步

开始,您必须了解 WebHDFS URL。WebHDFS 安装在HDFS Namenode上,默认端口为50070
因此,我们从http://[namenode_ip]:50070/webhdfs/v1/开始,其中/webhdfs/v1/是所有URL的公共URL。
为了举例,我们假设它是http://192.168.10.1:50070/web/hdfs/v1

第二步

通常,可以使用curl来列出HDFS目录的内容。有关详细说明,请参阅WebHDFS REST API:列出目录
如果您要使用curl,下面提供了给定目录中所有文件的FileStatuses

curl "http://192.168.10.1:50070/webhdfs/v1/<PATH>?op=LISTSTATUS"
             ^^^^^^^^^^^^ ^^^^^             ^^^^  ^^^^^^^^^^^^^
             Namenode IP  Port              Path  Operation

如上所述,这将返回JSON对象中的FileStatuses:

{
  "FileStatuses":
  {
    "FileStatus":
    [
      {
        "accessTime"      : 1320171722771,
        "blockSize"       : 33554432,
        "group"           : "supergroup",
        "length"          : 24930,
        "modificationTime": 1320171722771,
        "owner"           : "webuser",
        "pathSuffix"      : "a.patch",
        "permission"      : "644",
        "replication"     : 1,
        "type"            : "FILE"
      },
      {
        "accessTime"      : 0,
        "blockSize"       : 0,
        "group"           : "supergroup",
        "length"          : 0,
        "modificationTime": 1320895981256,
        "owner"           : "szetszwo",
        "pathSuffix"      : "bar",
        "permission"      : "711",
        "replication"     : 0,
        "type"            : "DIRECTORY"
      },
      ...
    ]
  }
}

使用python的默认库可以实现相同的结果:

import requests

my_path = '/my_path/'
curl = requests.get('http://192.168.10.1:50070/webhdfs/v1/%s?op=LISTSTATUS' % my_path)

如上所示,每个文件的实际状态比结果JSON低两个级别。换句话说,要获取每个文件的FileStatus:

curl.json()['FileStatuses']['FileStatus'] 

[
  {
    "accessTime"      : 1320171722771,
    "blockSize"       : 33554432,
    "group"           : "supergroup",
    "length"          : 24930,
    "modificationTime": 1320171722771,
    "owner"           : "webuser",
    "pathSuffix"      : "a.patch",
    "permission"      : "644",
    "replication"     : 1,
    "type"            : "FILE"
  },
  {
    "accessTime"      : 0,
    "blockSize"       : 0,
    "group"           : "supergroup",
    "length"          : 0,
    "modificationTime": 1320895981256,
    "owner"           : "szetszwo",
    "pathSuffix"      : "bar",
    "permission"      : "711",
    "replication"     : 0,
    "type"            : "DIRECTORY"
  },
  ...
]

第三步

既然您现在拥有了所需的所有信息,那么您所需要做的就是解析。

import os

file_paths = []
for file_status in curl.json()['FileStatuses']['FileStatus']:
    file_name = file_status['pathSuffix']
    # this is the file name in the queried directory
    if file_name.endswith('.csv'):
    # if statement is only required if the directory contains unwanted files (i.e. non-csvs).
        file_paths.append(os.path.join(path, file_name))
        # os.path.join asserts your result consists of absolute path

file_paths
['/my_path/file1.csv',
 '/my_path/file2.csv',
 ...]

最终步骤

现在你知道文件和WebHDFS链接的路径,pandas.read_csv可以处理剩下的工作。

import pandas as pd

dfs = []
web_url = "http://192.168.10.1:50070/webhdfs/v1/%s?op=OPEN"
#                                                  ^^^^^^^
#                                    Operation is now OPEN
for file_path in file_paths:
    file_url = web_url % file_path
    # http://192.168.10.1:50070/webhdfs/v1/my_path/file1.csv?op=OPEN
    dfs.append(pd.read_csv(file_url))

现在,所有的.csv都导入并分配给了dfs

警告

如果您的HDFS配置为HA(高可用性),则会有多个namenode,因此您的namenode_ip必须相应设置:它必须是活动节点的IP。

ryhaxcpt

ryhaxcpt2#

对这个问题的公认答案是我所能想象到的最糟糕的可能方法,是反人类罪。你可以用pyarrow读写。语法如下:

from pyarrow import fs
import pyarrow.parquet as pq

# connect to hadoop
hdfs = fs.HadoopFileSystem('hostname', 8020) 

# will read single file from hdfs
with hdfs.open_input_file(path) as pqt:
     df = pq.read_table(pqt).to_pandas()

# will read directory full of partitioned parquets (ie. from spark)
df = pq.ParquetDataset(path, hdfs).read().to_pandas()

相关问题