python从hdfs读取文件作为流

e3bfsja2  于 2021-06-03  发布在  Hadoop
关注(0)|答案(4)|浏览(689)

这里是我的问题:我在hdfs中有一个文件,它可能是巨大的(=不足以容纳所有的内存)
我想做的是避免将此文件缓存在内存中,并且像处理常规文件一样逐行处理它:

for line in open("myfile", "r"):
    # do some processing

我想看看是否有一个简单的方法来完成这项工作,而不使用外部库。我可能可以让它与libpyhdfs或python hdfs一起工作,但如果可能的话,我希望避免在系统中引入新的依赖项和未经测试的lib,特别是因为这两个看起来都没有严格维护,并且声明它们不应该在生产中使用。
我正在考虑使用标准的“hadoop”命令行工具和python来实现这一点 subprocess 模块,但我似乎不能做我需要的事情,因为没有命令行工具来做我的处理,我想以流方式为每一行执行python函数。
有没有办法使用subprocess模块将python函数作为管道的右操作数?或者更好的是,像一个文件一样打开它作为一个生成器,这样我就可以轻松地处理每一行了?

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)

如果有另一种方法可以实现我上面描述的,而不使用外部库,我也非常开放。
谢谢你的帮助!

piok6c0g

piok6c0g1#

在过去的两年里,hadoop流媒体有了很大的发展。根据cloudera的说法,这相当快:http://blog.cloudera.com/blog/2013/01/a-guide-to-python-frameworks-for-hadoop/ 我做得很成功。

hgtggwj0

hgtggwj02#

您可以使用webhdfs python库(构建在urllib3之上):

from hdfs import InsecureClient
client_hdfs = InsecureClient('http://host:port', user='root')
with client_hdfs.write(access_path) as writer:
    dump(records, writer)  # tested for pickle and json (doesnt work for joblib)

或者可以使用python中的requests包:

import requests
from json import dumps
params = (('op', 'CREATE')
('buffersize', 256))
data = dumps(file)  # some file or object - also tested for pickle library
response = requests.put('http://host:port/path', params=params, data=data)  # response 200 = successful

希望这有帮助!

hpxqektj

hpxqektj3#

如果需要xreadlines,它从文件中读取行,而不将整个文件加载到内存中。
编辑:
现在我明白你的问题了,你只需要把水管从你的嘴里拿出来 Popen 对象:

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in cat.stdout:
    print line
voj3qocg

voj3qocg4#

如果您想避免不惜任何代价添加外部依赖项,keith的答案是正确的。另一方面,pydoop可以让你的生活更轻松:

import pydoop.hdfs as hdfs
with hdfs.open('/user/myuser/filename') as f:
    for line in f:
        do_something(line)

关于您的问题,pydoop正在积极开发,并在crs4的生产中使用了多年,主要用于计算生物学应用。
西蒙尼

相关问题