pyspark:获取hdfs路径上的文件/目录列表

xytpbqjk  于 2021-06-02  发布在  Hadoop
关注(0)|答案(6)|浏览(2701)

根据标题。我知道 textFile 但是,顾名思义,它只适用于文本文件。我需要访问hdfs或本地路径上路径中的文件/目录。我用的是Pypark。

ct3nt3jp

ct3nt3jp1#

如果使用pyspark,则可以交互执行命令:
列出所选目录中的所有文件: hdfs dfs -ls <path> 例如。: hdfs dfs -ls /user/path :

import os
import subprocess

cmd = 'hdfs dfs -ls /user/path'
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
  print path

或在选定目录中搜索文件: hdfs dfs -find <path> -name <expression> 例如。: hdfs dfs -find /user/path -name *.txt :

import os
import subprocess

cmd = 'hdfs dfs -find {} -name *.txt'.format(source_dir)
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
  filename = path.split(os.path.sep)[-1].split('.txt')[0]
  print path, filename
3lxsmp7m

3lxsmp7m2#

使用jvm网关可能不那么优雅,但在某些情况下,下面的代码可能会有所帮助:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))

for fileStatus in status:
    print(fileStatus.getPath())
mwngjboj

mwngjboj3#

我相信把spark仅仅看作一个数据处理工具是有帮助的,它的域从加载数据开始。它可以读取许多格式,并且支持hadoop glob表达式,这对于从hdfs中的多个路径读取非常有用,但是它没有我所知道的用于遍历目录或文件的内置工具,也没有特定于与hadoop或hdfs交互的实用工具。
有一些可用的工具可以做您想做的事情,包括esutil和hdfs。hdfs lib支持cli和api,您可以直接跳到这里的“howdoilistfilesinpython”。看起来是这样的:

from hdfs import Config
client = Config().get_client('dev')
files = client.list('the_dir_path')
dfddblmv

dfddblmv4#

如果要读入目录中的所有文件,请 checkout sc.wholeTextFiles [doc],但请注意,文件的内容被读入一行的值中,这可能不是期望的结果。
如果您只想读取一些文件,那么生成一个路径列表(使用一个普通的hdfs-ls命令加上您需要的任何过滤)并将其传递到 sqlContext.read.text [doc]然后从 DataFrame 到一个 RDD 似乎是最好的办法。

vwoqyblh

vwoqyblh5#

有一个简单的方法来做到这一点,使用蛇咬图书馆

from snakebite.client import Client

hadoop_client = Client(HADOOP_HOST, HADOOP_PORT, use_trash=False)

for x in hadoop_client.ls(['/']):

...     print x
7xzttuei

7xzttuei6#

这可能适合您:

import subprocess, re
def listdir(path):
    files = str(subprocess.check_output('hdfs dfs -ls ' + path, shell=True))
    return [re.search(' (/.+)', i).group(1) for i in str(files).split("\\n") if re.search(' (/.+)', i)]

listdir('/user/')

这也起到了作用:

hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path('/user/')
[str(f.getPath()) for f in fs.get(conf).listStatus(path)]

相关问题