我想在pyspark中从/到工作机(函数)到hdfs高效地保存/读取numpy数组。我有两台机器a和b。a有主人和工人。b有一个工人。例如,我想实现以下目标:
if __name__ == "__main__":
conf = SparkConf().setMaster("local").setAppName("Test")
sc = SparkContext(conf = conf)
sc.parallelize([0,1,2,3], 2).foreachPartition(func)
def func(iterator):
P = << LOAD from HDFS or Shared Memory as numpy array>>
for x in iterator:
P = P + x
<< SAVE P (numpy array) to HDFS/ shared file system >>
有什么快速有效的方法可以做到这一点?
1条答案
按热度按时间b91juud31#
我偶然发现了同样的问题。最后在python3.4中使用了hdfscli模块和tempfiles。
进口:
创建hdfs客户机。在大多数情况下,最好在脚本中的某个地方有一个实用函数,例如:
在worker函数中加载并保存numpy:
笔记:
用于创建hdfs客户机的uri以
http://
,因为它使用hdfs文件系统的web接口;确保传递给hdfs客户机的用户具有读写权限
根据我的经验,开销并不显著(至少就执行时间而言)
使用tempfiles(与常规文件相比)的优势
/tmp
)是否确保在脚本结束后,集群计算机中没有垃圾文件(正常或不正常)