如何在hdfs中找到文件之间的对称性差异?

5q4ezhmt  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(406)

我有两个hdfs文件:/my/path/in/hdfs/part-r-(大约1000个部分,每个约10000行)和/my/another/path/in/hdfs/part-r-(大小相同)。第一个文件包含以下格式的数据:

id1 111
id6 212
id3 984

等等。第二个是:

999 id8
15 id4
93 id1

我想找到第一个文件中没有出现在第二个文件中的所有id,反之亦然。有什么简单的方法吗?

juzqafwq

juzqafwq1#

我必须承认,我怀疑这种计算是否适合mapreduce的严格范例,仅仅是因为这个过程有多复杂,计算量有多大(尽管你说过在你的案例中输入的两个文件大小相同),所以我认为这将是一个很好的例子,找到捷径,同时保持简单的事情。
首先,为了消除额外的iofat,您可能需要将这两个文件放在一个目录中(例如 \input 这里为了简单起见)只是为了绕过多输入的混乱。之后,只需一个mapreduce作业就可以更轻松地完成任务。
在Map阶段,您所需要做的就是将两个文件中的id设置为键,并将它们出现的“文件名”设置为它们的值(这是一种安全的方法,可以找到对称性差异,同时继续概括一个id在一个文件中可以被多次看到)。那些“文件名”其实不需要是实际的文件名,你可以把字符串 A 以及 B 指示此特定行中的特定id分别与第一个或第二个文件一起找到。
在reduce阶段,可以将引用单个键/id的所有值放在 HashSet 集合,其中包含您输入的所有唯一值。这意味着对于每个减速器(又名每个id),一个 HashSet 创建来放置 A 以及 B 字符串,仅存储其中一个示例。所以:
仅在第一个文件中看到的id将具有 HashSet 仅含 A 在里面,
仅在第二个文件中看到的id将具有 HashSet 仅含 B 在里面,
在两个文件中看到的id将具有 HashSet 收藏 A 以及 B 在它里面(也就是文件的交叉点,你不需要)。
有了它,你就可以简单地检查每个身份证 HashSet 只写前面两个选项中的那些。
这种类型的作业可以是这样的(这里的reduce函数实际上不需要在键值对中有一个值,所以我只把 String 使事情更简单):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class SymDiff 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <ID, file>
     */
    public static class Map extends Mapper<LongWritable, Text, Text, Text> 
    {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            String[] line = value.toString().split(" ");    // split each line to two columns

            // if the first column consists of integers, put the ID from the 2nd column as the key
            // and set "B" as the value to imply that the particular ID was found on the second file
            // else, put the ID from the first column as the key
            // and set "A" as the value to imply that the particular ID was found on the first file
            if(line[0].matches("\\d+"))     // (lazy way to see if the first string is an int without throwing an exception)
                context.write(new Text(line[1]), new Text("B"));
            else
                context.write(new Text(line[0]), new Text("A"));
        }
    }

    /* input: <ID, file>
     * output: <ID, "">
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            HashSet<String> list_of_files = new HashSet<String>();

            // store all the instances of "A" and "B" for each ID in a HashSet with unique values
            for(Text value : values)
                list_of_files.add(value.toString());

            // only write the IDs which they values only contain "A" or "B" (and not both) on their set 
            if(list_of_files.contains("A") && !list_of_files.contains("B") || (!list_of_files.contains("A") && list_of_files.contains("B")))
                context.write(key, new Text(""));
        }
    }

    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("input");
        Path output_dir = new Path("output");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job job = Job.getInstance(conf, "Symmetric Difference");
        job.setJarByClass(SymDiff.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);    
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input_dir);
        FileOutputFormat.setOutputPath(job, output_dir);
        job.waitForCompletion(true);
    }
}

您可以在下面的屏幕截图中看到所需的输出:

相关问题