使用python从hdfs获取文件名列表

toiithl6  于 2021-06-03  发布在  Hadoop
关注(0)|答案(5)|浏览(721)

这里是hadoop noob。
我搜索了一些关于hadoop和python入门的教程,但没有获得太多成功。我还不需要对Map器和还原器做任何工作,但这更像是一个访问问题。
作为hadoop集群的一部分,hdfs上有一堆.dat文件。
为了使用python访问我的客户机(本地计算机)上的那些文件,
我的电脑上需要有什么?
如何在hdfs上查询文件名?
任何链接也会有帮助。

g52tjvyc

g52tjvyc1#

我的电脑上需要有什么?
您需要安装并运行hadoop,当然还需要python。
如何在hdfs上查询文件名?
你可以试试这个。我还没有测试过代码,所以不要依赖它。

from subprocess import Popen, PIPE

process = Popen('hdfs dfs -cat filename.dat',shell=True,stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()

check for returncode, std_err
if:
    everything is OK, do whatever with stdout
else:
    do something in else condition

您还可以看看pydoop,它是一个用于hadoop的pythonapi。
尽管我的例子包括 shell=true ,您可以尝试在没有它的情况下运行,因为它存在安全风险。为什么你不应该使用 shell=True ?

wi3ka0sx

wi3ka0sx2#

据我所知,目前还没有现成的解决方案,我找到的大多数答案都是通过调用 hdfs 命令。我在linux上运行,也面临同样的挑战。我找到了 sh 包是有用的。这将为您处理运行o/s命令和管理stdin/out/err。
更多信息请参见此处:https://amoffat.github.io/sh/
不是最整洁的解决方案,但它是一行(ish)和使用标准包。
下面是我的代码截取一个hdfs目录列表。它将列出相似的文件和文件夹,因此如果需要区分它们,您可能需要修改它们。

import sh
hdfsdir = '/somedirectory'
filelist = [ line.rsplit(None,1)[-1] for line in sh.hdfs('dfs','-ls',hdfsdir).split('\n') if len(line.rsplit(None,1))][1:]

我的输出-在本例中,这些都是目录:

[u'/somedirectory/transaction_basket_fct/date_id=2015-01-01',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-02',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-03',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-04',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-05',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-06',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-07',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-08']

我们来分解一下:
运行 hdfs dfs -ls /somedirectory 命令我们可以使用 sh Package 如下:

import sh
sh.hdfs('dfs','-ls',hdfsdir)
``` `sh` 允许您无缝调用o/s命令,就像它们是模块上的函数一样。将命令参数作为函数参数传递。非常整洁。
对我来说,它的回报是:

Found 366 items
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-01
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-02
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-03
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-04
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-05

根据新行字符使用 `.split('\n')` 使用获取字符串中的最后一个“单词” `line.rsplit(None,1)[-1]` .
要防止列表中的空元素出现问题,请使用 `if len(line.rsplit(None,1))` 最后删除列表中的第一个元素( `Found 366 items` )使用 `[1:]` 
vulvrdjw

vulvrdjw3#

正如jgc所说的,您可以做的最简单的事情就是从登录(via)开始 ssh )其中一个节点(参与hadoop群集的服务器)并验证您是否具有正确的访问控制和权限:
使用hdfs客户机列出您的主目录。 hdfs dfs -ls 列出生活在hdfs中的感兴趣的目录。 hdfs dfs -ls <absolute or relative path to HDFS directory> 然后,在python中,应该使用子流程和hdfs客户机来访问感兴趣的路径,并使用 -C 标记以排除不必要的元数据(以避免以后进行丑陋的后处理)。
即。 Popen(['hdfs', 'dfs', '-ls', '-C', dirname]) 之后,在新行上拆分输出,然后您将得到路径列表。
下面是一个示例以及日志记录和错误处理(包括当目录/文件不存在时):

from subprocess import Popen, PIPE
import logging
logger = logging.getLogger(__name__)

FAILED_TO_LIST_DIRECTORY_MSG = 'No such file or directory'

class HdfsException(Exception):
    pass

def hdfs_ls(dirname):
    """Returns list of HDFS directory entries."""
    logger.info('Listing HDFS directory ' + dirname)
    proc = Popen(['hdfs', 'dfs', '-ls', '-C', dirname], stdout=PIPE, stderr=PIPE)
    (out, err) = proc.communicate()
    if out:
        logger.debug('stdout:\n' + out)
    if proc.returncode != 0:
        errmsg = 'Failed to list HDFS directory "' + dirname + '", return code ' + str(proc.returncode)
        logger.error(errmsg)
        logger.error(err)
        if not FAILED_TO_LIST_DIRECTORY_MSG in err:
            raise HdfsException(errmsg)
        return []
    elif err:
        logger.debug('stderr:\n' + err)
    return out.splitlines()

# dat_files will contain a proper Python list of the paths to the '.dat' files you mentioned above.

dat_files = hdfs_ls('/hdfs-dir-with-dat-files/')
pgky5nke

pgky5nke4#

对于仅使用Python3的原始子进程库的“在hdfs上查询文件名”:

from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().split('\n')[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().split('\n')[1:]][:-1]
xqk2d5yq

xqk2d5yq5#

您应该具有群集中某个节点的登录权限。让集群管理员选择节点并设置帐户,并通知您如何安全地访问节点。如果您是管理员,请告诉我群集是本地的还是远程的,如果是远程的,那么它是托管在您的计算机上、公司内部还是第三方云上,如果是谁的,那么我可以提供更多相关信息。
要在hdfs中查询文件名,请登录到集群节点并运行 hadoop fs -ls [path] . 路径是可选的,如果未提供,则会列出主目录中的文件。如果 -R 作为选项提供,然后递归列出路径中的所有文件。此命令还有其他选项。有关此命令和其他hadoop文件系统shell命令的更多信息,请参见http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystemshell.html.
在python中查询hdfs文件名的一种简单方法是 esutil.hdfs.ls(hdfs_url='', recurse=False, full=False) ,执行 hadoop fs -ls hdfs_url 在子进程中,它还具有许多其他hadoop文件系统shell命令的函数(请参阅http://code.google.com/p/esutil/source/browse/trunk/esutil/hdfs.py). esutil可以与一起安装 pip install esutil . 在pypi上https://pypi.python.org/pypi/esutil,文档位于http://code.google.com/p/esutil/ 它的github站点是https://github.com/esheldon/esutil.

相关问题