我写了两个Map器map1和map2
map1-读取hdfs中的seq文件并对其进行处理。
map2—从hbase读取数据并生成与map1相同的键、值对。
最后我把它们合并到了reduceral中。
问题是只有一个Map器正在运行,并且作业完成时没有任何类型的错误。只有最后一个Map器正在运行(即 TableMapReduceUtil
). 如果我交换线路 TableMapReduceUtil
以及 MultipleInputs
,然后是最后一个。 MultipleInputs
Map器运行。
我做错什么了?两种方案都不会引发错误。我还阅读了2个文件使用 addCacheFile()
但我想这并不重要。
Job job3 = Job.getInstance(config, "Test");
if (true) {
job3.setJarByClass(Main.class);
job3.setMapOutputKeyClass(ImmutableBytesWritable.class);
job3.setMapOutputValueClass(ImmutableBytesWritable.class);
job3.setOutputKeyClass(ImmutableBytesWritable.class);
job3.setOutputValueClass(ImmutableBytesWritable.class);
job3.getConfiguration().set("StartDate", c_startDate);
job3.getConfiguration().set("EndDate", c_endDate);
job3.addCacheFile(new URI(args[8]));
job3.getConfiguration().set("abc", args[8].substring(args[8].lastIndexOf("/") + 1));
job3.addCacheFile(new URI(args[9]));
job3.getConfiguration().set("xyz", args[9].substring(args[9].lastIndexOf("/") + 1));
job3.setReducerClass(ReducerAll.class);
job3.setOutputFormatClass(SequenceFileOutputFormat.class);
job3.setNumReduceTasks(10);
Scan scan = new Scan();
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("hbasetable"));
scan.setCaching(300);
scan.setCacheBlocks(false);
MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
TableMapReduceUtil.initTableMapperJob(
"hbasetable",
scan,
Map2.class,
ImmutableBytesWritable.class,
ImmutableBytesWritable.class,
job3);
FileOutputFormat.setOutputPath(job3, new Path(args[7]));
job3.waitForCompletion(true);
if (!job3.waitForCompletion(true)) {
return (1);
}
1条答案
按热度按时间lskq00tm1#
我相信这种行为是由于以下两个原因lines:-
只有一个工作3
尽管您已经提到有两个Map器,但请查看Map器类型。Map绘制者
Map1
会不同于Map2
.Map1
是一个Mapper
,和Map2
是一个TableMapper
将这两条语句放在一起并不意味着它们基本上都是在job3的multipleinputs设置中使用的。对于map1,multipleinputs仍然只有一个设置。map2的另一个设置仍然是独立的。现在开始执行死刑。两种配置multipleinputs或tablemapreduceutil中的后一种会覆盖job3中的前一种配置,因此只执行一个Map器
请让我知道,如果这是不正确的,我还没有验证我的理解,我在我的机器这里提出。