我对map/reduce原则和pythonmrjob框架非常陌生,我编写了这个示例代码,它工作得很好,但是我想知道我可以在其中做些什么来使它“完美”/更高效。
from mrjob.job import MRJob
import operator
import re
# append result from each reducer
output_words = []
class MRSudo(MRJob):
def init_mapper(self):
# move list of tuples across mapper
self.words = []
def mapper(self, _, line):
command = line.split()[-1]
self.words.append((command, 1))
def final_mapper(self):
for word_pair in self.words:
yield word_pair
def reducer(self, command, count):
# append tuples to the list
output_words.append((command, sum(count)))
def final_reducer(self):
# Sort tuples in the list by occurence
map(operator.itemgetter(1), output_words)
sorted_words = sorted(output_words, key=operator.itemgetter(1), reverse=True)
for result in sorted_words:
yield result
def steps(self):
return [self.mr(mapper_init=self.init_mapper,
mapper=self.mapper,
mapper_final=self.final_mapper,
reducer=self.reducer,
reducer_final=self.final_reducer)]
if __name__ == '__main__':
MRSudo.run()
1条答案
按热度按时间64jmpszr1#
有两种方法可以遵循。
1改进您的流程
您正在进行分布式字数统计。此操作是代数操作,但您没有利用此属性。
对于你输入的每一个单词,你都要向减缩器发送一个记录。这些字节必须进行分区,通过网络发送,然后由减速机排序。它既不高效,也不可伸缩,Map器发送给还原器的数据量通常是一个瓶颈。
你应该在工作中增加一个合路器。它将做完全相同的事情比你目前的减速机。合并器在同一地址空间中紧跟Map器运行。这意味着,通过网络发送的数据量不再与输入的字数成线性关系,而是受唯一字数的限制。通常要低几个数量级。
由于分布式字数计算示例被过度使用,您可以通过搜索“分布式字数合并器”轻松找到更多信息。所有代数运算都必须有一个组合器。
2使用更有效的工具
mrjob是一个很好的工具,可以快速编写map reduce作业。通常编写python作业比编写java作业要快。但是,它有运行时成本:
python通常比java慢
mrjob比大多数python框架都慢,因为is还没有使用
typedbytes
您必须决定是否值得使用常规api在java中重写某些作业。如果您正在编写长寿命的批处理作业,那么投资一些开发时间来降低运行时成本是有意义的。从长远来看,编写java作业通常不会比用python编写长多少。但是你必须做一些前期投资:用构建系统创建一个项目,打包它,部署它等等。
cloudera在几个月前做了hadooppython框架的基准测试。mrjob比java作业慢了很多(5到7倍)。当typedbytes可用时,mrjob性能应该会提高,但是java作业仍然会快2到3倍。