使用mapreduce在hadoop中进行文件比较

6ss1mwsb  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(371)

a.txt文件

col1       col2    col3    col4    col5
A           120     140     160     180
B           200     220     240     260
D           400     420     440     460

b、 文本

col1      col2    col3    col4    col5
A          110     140     160     180
B          200     220     240     260
C          600     620     640     660

输出文件

A          120     140     160     180
A          110     140     160     180
B          200     220     240     260
D          400     420     440     460
C          600     620     640     660

1) col1和col2是这个函数中的主键,任何一个键被改变,我们都会显示两个记录,比如

in A.txt contain 1st Records:- A          120      140     160     180
in B.txt contain 1st Records:- A          110      140     160     180

在这个列中,2发生了变化,所以我必须显示两个记录
2) 如果两个文件上的记录没有变化(我的意思是看起来一样),我们只能显示一个记录
3) 在这两个文件中显示所有其他记录
最终输出应该如下所示

输出文件

A          120     140     160     180
A          110     140     160     180
B          200     220     240     260
D          400     420     440     460
C          600     620     640     660
inkz8wg9

inkz8wg91#

这里是 mapreduce 解决方案:
将两个或多个文件放在一个目录中(输入- arg1 ),它会将所有文件与一个符合您所有要求的文件合并。它还匹配col3以结束一个键的非maching行(col1+col2)有关详细信息,请参阅注解。。。

public class FileCompare  extends Configured implements Tool{

    public static class FileComapreMapper extends Mapper<Object, Text, Text, Text> {
        int lineno=0;

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
            try{
                lineno++;
                System.out.println(lineno + " -> " + value);
                //skip header - uncomment this line to include header in output
                if(lineno == 1) return; 

                String[] fields = value.toString().split("\\s+");//assuming input recs are whitespace seperated
                String col1_col2 = fields[0] + "," + fields[1]; //key
                String col3tolast="";
                for(int i=2; i < fields.length;i++)
                    col3tolast+=fields[i] + ","; //values

               col3tolast=col3tolast.substring(0, col3tolast.length()-1); //remove last char(',')
               context.write(new Text(col1_col2), new Text(col3tolast)); //send key, value pairs to reducer
            }catch(Exception e){
                System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
            }
        }   
    }

    public  static class FileComapreReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException {
            //Get unique col3 to last value
            Set<Text> uniqueCol3tolast = new HashSet<Text>();
            for(Text record : values)
                uniqueCol3tolast.add(record);
            //write key + value
            for(Text col3tolast:uniqueCol3tolast) //outputing tab delimited recs
                context.write(new Text(key.toString().replaceAll(",", "\t")), 
                        new Text(col3tolast.toString().replaceAll(",", "\t")));     
        }
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new FileCompare(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: <in> <out>");
            System.exit(2);
        }
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "merge-two-files");
        job.setJarByClass(FileCompare.class);
        job.setMapperClass(FileComapreMapper.class);
        job.setReducerClass(FileComapreReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileSystem fs = null;
        Path dstFilePath = new Path(args[1]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        return job.waitForCompletion(true) ? 0 : 1;
    } 
}

pgpifvop

pgpifvop2#

使用pig.load加载两个文件,合并记录,然后将其区分开来。

A = LOAD 'A.txt' USING PigStorage('\t');
B = LOAD 'B.txt' USING PigStorage('\t');
C = UNION A,B;
D = DISTINCT C;
DUMP D;

相关问题