我在reduce阶段的reduce程序中使用multipleoutputs。我正在处理的数据集大约是270MB,我正在我的伪分布式单节点上运行它。我已经使用自定义可写为我的Map输出值。关键是数据集中存在的国家。
public class reduce_class extends Reducer<Text, name, NullWritable, Text> {
public void reduce(Text key,Iterable<name> values,Context context) throws IOException, InterruptedException{
MultipleOutputs<NullWritable,Text> m = new MultipleOutputs<NullWritable,Text>(context);
long pat;
String n;
NullWritable out = NullWritable.get();
TreeMap<Long,ArrayList<String>> map = new TreeMap<Long,ArrayList<String>>();
for(name nn : values){
pat = nn.patent_No.get();
if(map.containsKey(pat))
map.get(pat).add(nn.getName().toString());
else{
map.put(pat,(new ArrayList<String>()));
map.get(pat).add(nn.getName().toString());}
}
for(Map.Entry entry : map.entrySet()){
n = entry.getKey().toString();
m.write(out, new Text("--------------------------"), key.toString());
m.write(out, new Text(n), key.toString());
ArrayList<String> names = (ArrayList)entry.getValue();
Iterator i = names.iterator();
while(i.hasNext()){
n = (String)i.next();
m.write(out, new Text(n), key.toString());
}
m.write(out, new Text("--------------------------"), key.toString());
}
m.close();
}
}
以上是我的推理
问题
1) 上面的代码对于较小的数据集工作正常,但是由于270MB数据集的堆空间而失败。
2) 使用country作为键在单个iterable集合中传递相当大的值。我试图解决这个问题,但是mutlipleoutputs为给定的一组键创建了唯一的文件。关键是我不能附加一个已经存在的文件,这个文件是以前运行reduce和throws错误创建的。因此,对于特定的键,我必须创建新的文件。有办法解决这个问题吗。解决上述错误导致我将键定义为国家名称(我的最终排序数据),但抛出java堆错误。
样本输入
3858241,“杜兰德”,“菲利普”,“e.”,“r.”,“哈德逊”,“马萨诸塞州”,“美国”,“1 3858241”,“诺里斯”,“朗尼”,“h.”,“米尔福德”,“马”,“美国”,“2 3858242”,“古丁”,“埃尔温”,“r.”,“达尔文路120号”,“平克尼”,“密苏里州”,“美国”,“48169”,1 3858243,“皮耶龙”,“克劳德”,“雷蒙德”,“伊皮纳尔”,“fr”,“1 3858243”,“珍妮”,“琼”,“保罗”,“迪克内斯”,“德克内斯”,“fr”,“2 3858243,”祖卡罗“,“罗伯特”,“理查德”,“l.”,“l.”,“邮政信箱69”,“伍德斯托克”,“ct”,“美国”,“06281”,1
小数据集的示例输出
目录结构示例。。。
ca-r-00000美元
fr-r-00000型
魁北克-r-00000
德克萨斯州r-00000
us-r-00000美元
- 个别内容*
3858241菲利普e。杜兰德
朗尼h。诺里斯
3858242
埃尔温r。古丁
3858244
1条答案
按热度按时间ff29svar1#
我知道我在回答一个很老的问题,但无论如何让我在这里提出一些想法。似乎您正在reducer中创建一个treemap,其中包含一个reduce调用中获得的所有记录。在mapreduce中,您无法在内存中保存所有记录,因为它永远不会缩放。你在做一张Map
patent_no
以及所有names
与此相关patent_no
. 你只需要根据patent_no
,那么为什么不利用mapreduce框架的排序呢。你应该包括
patent_no
以及name
随着country
在可写密钥本身中。写下你的
Partitioner
仅根据…划分country
.应打开排序
country
,patent_no
,name
.你应该写下你的名字
Grouping comparator
分组country
,patent_no
.结果所有的记录
country
将转到同一个减速机并按patent_no
以及name
. 在同一个减速机中,不同的专利号会被不同的减速机调用。现在你只需要简单地把它写在乘法输出中。这样就可以去掉任何内存树Map。我建议你注意以下几点:
不创建
new MultipleOutputs
每次在reduce方法中,应该编写一个setup()
方法并在setup()
方法。不创建
new Text()
每次,而是在setup方法中创建一个示例,并通过set("string")
方法Text
. 你可以说这有什么意义,java的gc无论如何都会垃圾收集它。但是您应该总是尽量使用内存,这样java的垃圾收集就不那么频繁地被调用了。