HDFS 如何使用json文件的多步mrjob

omqzjyyz  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(169)

我试着用hadoop从一个json文件中得到一些统计数据,比如某个类别或语言的平均星级数。为了做到这一点,我用mrjob,我找到了这个code

import re

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep

WORD_RE = re.compile(r"[\w']+")

class MRMostUsedWord(MRJob):
FILES = ['stop_words.txt']

OUTPUT_PROTOCOL = JSONValueProtocol

def configure_args(self):
    super(MRMostUsedWord, self).configure_args()

    # allow for alternate stop words file
    self.add_file_arg(
        '--stop-words-file',
        dest='stop_words_file',
        default=None,
        help='alternate stop words file. lowercase words, one per line',
    )

def mapper_init(self):
    stop_words_path = self.options.stop_words_file or 'stop_words.txt'

    with open(stop_words_path) as f:
        self.stop_words = set(line.strip() for line in f)

def mapper_get_words(self, _, line):
    # yield each word in the line
    for word in WORD_RE.findall(line):
        word = word.lower()
        if word not in self.stop_words:
            yield (word, 1)

def combiner_count_words(self, word, counts):
    # sum the words we've seen so far
    yield (word, sum(counts))

def reducer_count_words(self, word, counts):
    # send all (num_occurrences, word) pairs to the same reducer.
    # num_occurrences is so we can easily use Python's max() function.
    yield None, (sum(counts), word)

# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
    # each item of word_count_pairs is (count, word),
    # so yielding one results in key=counts, value=word
    try:
        yield max(word_count_pairs)
    except ValueError:
        pass

def steps(self):
    return [
        MRStep(mapper_init=self.mapper_init,
               mapper=self.mapper_get_words,
               combiner=self.combiner_count_words,
               reducer=self.reducer_count_words),
        MRStep(reducer=self.reducer_find_max_word)
    ]

if __name__ == '__main__':
    MRMostUsedWord.run()

它允许找到最常用的单词,但我不确定如何用json属性而不是单词来做这件事。
样的json:
{“审查标识”:“产品编号”:“产品_中文_0440378”,“审阅者标识”:“评论者_en_0133349”,“明星”:“1”,“审查_正文”:“内阁点都脱离了支持...得到了我”,“审查_标题”:“不能使用”,“语言”:“en”,“产品类别”:“家居装修”}
{“审查标识”:“en_0311558”,“产品标识”:“产品_en_0399702”,“审阅者标识”:“评论者_en_0152034”,“明星”:“1”,“审查_正文”:“我收到了第一个订购的产品,它坏了,所以我又订购了一次。第二个比第一个坏的地方更多。我不能责怪运输过程,因为它是收缩 Package 和盒装的。",“review_title”:《产品是垃圾》,《语言》:“en”,“产品类别”:“家”}

qv7cva1a

qv7cva1a1#

对我来说,使用json.loads很有用,比如:

def mapper(self, _, line):
    review = json.loads(line)

相关问题