hadoop中map函数的输入分割

0sgqnhkj  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(351)

这是我在hadoop中的第一个实现。我正在尝试在map reduce中实现我的概率数据集算法。在我的数据集中,最后一列将有一些id(数据集中唯一id的数量等于集群中的节点数量)。我必须根据这个列值来划分数据集,并且集群中的每个节点都应该处理每组记录。
例如,如果集群中有三个节点,对于下面的数据集,一个节点应该处理id=1的所有记录,另一个节点处理id=2的记录,另一个节点处理id=3的记录

name time  dept  id
--------------------
 b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

我的map函数应该将每个分割作为一个输入,并在每个节点中并行处理它。
我只是想了解,在hadoop中哪种方法是可行的。输入这个数据集作为我的map函数的输入,并通过map传递一个额外的参数来根据id值分割数据。或者预先将数据拆分为“n”(节点数)子集并将其加载到节点中,如果这是正确的方法,那么如何根据不同节点中的值和负载来拆分数据。因为,我从阅读资料中了解到的是,hadoop根据指定的大小将数据分割成块。如何在加载时指定特定条件。总而言之,我正在用python编写我的程序。
有人请指点一下。谢谢

bz4sfanl

bz4sfanl1#

对您来说,最简单的事情可能是让Map器以id作为键输出数据,这将保证一个reducer将获得特定id的所有记录,然后在reducer阶段进行处理。
例如,
输入数据:

b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

Map程序代码:


# !/usr/bin/env python

import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[-1]
    print key + "\t" + line

Map输出:

1  b1  2:00pm z1   1
 2  b2  3:00pm z2   2
 1  c1  4:00pm y2   1
 3  b3  3:00pm z3   3
 2  c4  4:00pm x2   2

减速器1输入:

1  b1  2:00pm z1   1
 1  c1  4:00pm y2   1

减速器2输入:

2  b2  3:00pm z2   2

减速器3输入:

3  b3  3:00pm z3   3

减速机代码:


# !/usr/bin/env python

import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    orig_line = "\t".join(cols[1:])
    # do stuff...

请注意,这样一个reducer可能会获得多个键,但数据将被排序,您可以使用mapred.reduce.tasks选项控制reducer的数量。
编辑如果你想收集你的数据在减速器每键你可以这样做(不确定它会运行,但你得到的想法)


# !/usr/bin/env python

import sys
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

last_key = None
data = []
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    if last_key and key != last_key:
        process_data(last_key, data)
        data = []
    orig_line = "\t".join(cols[1:])
    data.append(orig_line)
    last_key = key
process_data(last_key, data)

如果您不担心在reducer步骤中内存不足,可以将代码简化为:


# !/usr/bin/env python

import sys
from collections import defaultdict
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

all_data = defaultdict(list)
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    orig_line = "\t".join(cols[1:])
    all_data[key].append(orig_line)
for key, data in all_data.iteritems():
    process_data(key, data)
pod7payv

pod7payv2#

如果我理解您的问题,最好的方法是将数据集加载到配置单元表中,然后用python编写udf。在那之后,像这样做:

select your_python_udf(name, time, dept, id) from table group by id;

这看起来像是reduce阶段,所以在启动查询之前可能需要这个

set mapred.reduce.tasks=50;

如何创建自定义自定义项:
Hive插件
创建函数

相关问题