如何将numpy数组从pyspark worker保存到hdfs或共享文件系统?

evrscar2  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(652)

我想在pyspark中从/到工作机(函数)到hdfs高效地保存/读取numpy数组。我有两台机器a和b。a有主人和工人。b有一个工人。例如,我想实现以下目标:

if __name__ == "__main__":
    conf = SparkConf().setMaster("local").setAppName("Test")
    sc = SparkContext(conf = conf)
    sc.parallelize([0,1,2,3], 2).foreachPartition(func)

def func(iterator):
    P = << LOAD from HDFS or Shared Memory as numpy array>>
    for x in iterator:
        P = P + x

    << SAVE P (numpy array) to HDFS/ shared file system >>

有什么快速有效的方法可以做到这一点?

b91juud3

b91juud31#

我偶然发现了同样的问题。最后在python3.4中使用了hdfscli模块和tempfiles。
进口:

from hdfs import InsecureClient
from tempfile import TemporaryFile

创建hdfs客户机。在大多数情况下,最好在脚本中的某个地方有一个实用函数,例如:

def get_hdfs_client():
    return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
         root="<hdfs base path>")

在worker函数中加载并保存numpy:

hdfs_client = get_hdfs_client()

# load from file.npy

path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()

with hdfs_client.read(path) as reader:
    tf.write(reader.read())
    tf.seek(0) # important, set cursor to beginning of file

np_array = numpy.load(tf)

...

# save to file.npy

tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file

# with overwrite=False, an exception is thrown if the file already exists

hdfs_client.write("/whatever/output/file.npy", tf.read(),  overwrite=True)

笔记:
用于创建hdfs客户机的uri以 http:// ,因为它使用hdfs文件系统的web接口;
确保传递给hdfs客户机的用户具有读写权限
根据我的经验,开销并不显著(至少就执行时间而言)
使用tempfiles(与常规文件相比)的优势 /tmp )是否确保在脚本结束后,集群计算机中没有垃圾文件(正常或不正常)

相关问题