rdd = sparkContext.wholeTextFiles(path)
def func_work_on_individual_files(x):
# x is a tuple which will receive both (key, value) for the pairRDD Row Elements passed. key -> file path, value -> content of a file with line seperated by '/n' (as you mentioned). To access key use x[0], to access value use x[1].
# your logic to do something useful with file data,
# to get separate lines you can use: x[1].split('\n')
# end function by return the values you want to return out of a file's data.
# I am simply returning the whole content of file
return x[1]
# loop over each of the file in the pairRdd created above
file_contents = rdd.map(func_work_on_individual_files)
# this will create just one partition out of all elements in list (as you mentioned)
consolidated_contents = file_contents.repartition(1)
# Save final output - this will create just one path like Hadoop
consolidated_contents.saveAsTextFile(path)
2条答案
按热度按时间cyej8jka1#
pyspark为此用例提供了一个函数:
sparkContext.wholeTextFiles(path)
. 它将读取一个文本文件目录并生成一个键值对,其中key是每个文件的路径,value是每个文件的内容。zf2sa74q2#
我希望这能解决你的一些困惑:
sparkContext.wholeTextFiles(path)
返回一个pairRDD
(帮助链接:https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html)简而言之,pairdd更像一个Map(即have key,value)