totalorderpartion和chainmapper

sr4lhrrt  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(260)

我有一个链Map器与2个Map器关联。我试图在链中的最后一个Map器上执行totalorderpartition,但没有成功。
有没有一种方法可以基于链中第n个Map器上的一些采样来强制分区?

public class WordCountChain extends Configured implements Tool
{
    @Override
    public int run(String[] args) throws Exception 
    {
        Job job = new Job(getConf(), "Word Count V1 (Chain)");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        /***********First Mapper***********/
        Configuration wcpMapperConf = new Configuration(false);
        ChainMapper.addMapper(job, WordCountPreparationMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, wcpMapperConf);

        /***********Second Mapper***********/
        Configuration wcMapperConf = new Configuration(false);
        ChainMapper.addMapper(job, Mapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, wcMapperConf);

        /*******This enforces the Sampling/Partitioning over the First Mapper*******/
        //job.setInputFormatClass(SequenceFileInputFormat.class);
        //InputSampler.Sampler<Text, IntWritable> sampler = new InputSampler.RandomSampler<Text, IntWritable>(0.1, 10000, 10);
        //InputSampler.writePartitionFile(job, sampler);
        //job.addCacheFile( new URI( TotalOrderPartitioner.getPartitionFile(getConf()) ) );

        job.setNumReduceTasks(10);
        job.setReducerClass(WordCountReducer.class);
        return (job.waitForCompletion(true) ? 0 : 1);
     }

     public static void main(String[] args) throws Exception 
     {
        int exitCode = ToolRunner.run(new WordCountChain(), args);
        System.exit(exitCode);
     }
}
41zrol4v

41zrol4v1#

不幸的是,randomsampler甚至在作业开始之前就运行了,事实上它是在您调用时运行的

InputSampler.writePartitionFile(job, sampler);

这意味着它不会在任何Map器的输出上运行,而是在作业的输入数据集上运行。
如果需要基于第n个Map器的输出进行分区,可以将作业分为两个作业,一个仅Map作业和一个mapreduce作业。第一个将运行Map器链到第n个Map器,然后只存储该Map器的输出。第二个作业将基于输入(这将是第n个Map器的输出)进行采样和分区,然后运行其余的Map器和缩减器。

相关问题