如何优化这个mapreduce函数,python,mrjob

2wnc66cl  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(436)

我对map/reduce原则和pythonmrjob框架非常陌生,我编写了这个示例代码,它工作得很好,但是我想知道我可以在其中做些什么来使它“完美”/更高效。

from mrjob.job import MRJob
import operator
import re

# append result from each reducer

output_words = []

class MRSudo(MRJob):

    def init_mapper(self):
        # move list of tuples across mapper
        self.words = []

    def mapper(self, _, line):
        command = line.split()[-1]
        self.words.append((command, 1))

    def final_mapper(self):
    for word_pair in self.words:
            yield word_pair

    def reducer(self, command, count): 
        # append tuples to the list
        output_words.append((command, sum(count)))

    def final_reducer(self):
        # Sort tuples in the list by occurence
        map(operator.itemgetter(1), output_words)
        sorted_words = sorted(output_words, key=operator.itemgetter(1), reverse=True)
        for result in sorted_words:
            yield result

    def steps(self):
        return [self.mr(mapper_init=self.init_mapper,
                        mapper=self.mapper,
                        mapper_final=self.final_mapper,
                        reducer=self.reducer,
                        reducer_final=self.final_reducer)]

if __name__ == '__main__':
    MRSudo.run()
64jmpszr

64jmpszr1#

有两种方法可以遵循。
1改进您的流程
您正在进行分布式字数统计。此操作是代数操作,但您没有利用此属性。
对于你输入的每一个单词,你都要向减缩器发送一个记录。这些字节必须进行分区,通过网络发送,然后由减速机排序。它既不高效,也不可伸缩,Map器发送给还原器的数据量通常是一个瓶颈。
你应该在工作中增加一个合路器。它将做完全相同的事情比你目前的减速机。合并器在同一地址空间中紧跟Map器运行。这意味着,通过网络发送的数据量不再与输入的字数成线性关系,而是受唯一字数的限制。通常要低几个数量级。
由于分布式字数计算示例被过度使用,您可以通过搜索“分布式字数合并器”轻松找到更多信息。所有代数运算都必须有一个组合器。
2使用更有效的工具
mrjob是一个很好的工具,可以快速编写map reduce作业。通常编写python作业比编写java作业要快。但是,它有运行时成本:
python通常比java慢
mrjob比大多数python框架都慢,因为is还没有使用 typedbytes 您必须决定是否值得使用常规api在java中重写某些作业。如果您正在编写长寿命的批处理作业,那么投资一些开发时间来降低运行时成本是有意义的。
从长远来看,编写java作业通常不会比用python编写长多少。但是你必须做一些前期投资:用构建系统创建一个项目,打包它,部署它等等。
cloudera在几个月前做了hadooppython框架的基准测试。mrjob比java作业慢了很多(5到7倍)。当typedbytes可用时,mrjob性能应该会提高,但是java作业仍然会快2到3倍。

相关问题