hadoop 如何接收字典列表作为MRJob作业的参数?

wko9yo5t  于 2023-03-29  发布在  Hadoop
关注(0)|答案(1)|浏览(218)

我理解如何以编程方式接收输出,以及如何运行MRJob作业。这在here中有明确的解释。然而,我很难理解如何将字典列表或其他文件中的任何变量传递到MrJob作业中。而不是像我可能有“words.txt”一样有一个输入文件,相反,我想把said“words”作为一个包含这些单词的变量(类型列表)传递。
更具体地说,假设我已经说了列表:

mylist = [
     {"name": "Kayer", "Job": "Programmer"},
     {"name": "Angela", "Job": "Designer"},
     {"name": "Eve", "Job": "Programmer"},
     {"name": "Robert", "Job": "Programmer"},
]

我想运行一个MrJob作业,它会获取上述列表,并返回给我(例如)工作是程序员的人数。
最后,为了系统的设计,我可能不会将列表临时存储到文本文件或任何文件中。
据我目前所知,我无法在我试图运行的作业的同一个类和/或文件中运行以下代码:

mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
     runner.run()
     for key, value in mr_job.parse_output(runner.cat_output()):
         ... # do something with the parsed output

因此,我设置了一个不同的文件,然后我不知道如何将数据发送到我的MrJob...我甚至不知道是否有一种方法可以让我把数据传递到MrJob摆在首位。

h7wcgrx3

h7wcgrx31#

首先,我强烈建议切换到Spark,因为它比MrJob好得多,如果像我一样你不能或者你只是想知道,那么这里有一个解决方案。
查看MrJob文档,我们可以看到我们可以编程运行作业(如here所示)。这允许我们接受输入(在我们的例子中是字典)并使用MrSpark将其作为MapReduce作业进行处理。
为了使所有这些工作,让我们假设我们有一个名为main.py的文件,我们将从其中运行作业。根据文档,运行作业的文件与包含作业的文件不同是至关重要的。
示例main.py

from io import BytesIO
from test_job import TestJob

data = [
     {"name": "Kayer", "Job": "Programmer"},
     {"name": "Angela", "Job": "Designer"},
     {"name": "Eve", "Job": "Programmer"},
     {"name": "Robert", "Job": "Programmer"},
]

if __name__ == '__main__':
    # Prepare the data to be passed
    modified_data = ''
    for d in data:
        modified_data += f'{str(d)}\n'

    stdin = BytesIO(bytes(modified_data, 'utf8'))

    # Creates the job and gives it the data
    job = TestJob(['--no-conf', '-'])
    job.sandbox(stdin)

    # Returns the results for the given job
    with job.make_runner() as runner:
        runner.run()
        for key, value in job.parse_output(runner.cat_output()):
            print(f'key: {key} -> value: {value}')

在准备好这些之后,我们需要确保作业现在可以读取接收到的输入并相应地处理它。为此,我们创建了一个新文件,在本例中称为test_job.py
示例test_job.py

from mrjob.job import MRJob
import json

class TestJob(MRJob):
    def mapper(self, _, line):
        # Takes the data and prepares it to be loaded into a dictionary
        test = line.replace("'", '"')
        test = test.replace('None', 'null')
        # Make the received line into a dictionary
        data = json.loads(test)

        if 'Job' in data.keys():
            if data['Job'] == 'Programmer':
                yield 'Programmer', 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    TestJob.run()

相关问题