我必须承认,我怀疑这种计算是否适合mapreduce的严格范例,仅仅是因为这个过程有多复杂,计算量有多大(尽管你说过在你的案例中输入的两个文件大小相同),所以我认为这将是一个很好的例子,找到捷径,同时保持简单的事情。 首先,为了消除额外的iofat,您可能需要将这两个文件放在一个目录中(例如 \input 这里为了简单起见)只是为了绕过多输入的混乱。之后,只需一个mapreduce作业就可以更轻松地完成任务。 在Map阶段,您所需要做的就是将两个文件中的id设置为键,并将它们出现的“文件名”设置为它们的值(这是一种安全的方法,可以找到对称性差异,同时继续概括一个id在一个文件中可以被多次看到)。那些“文件名”其实不需要是实际的文件名,你可以把字符串 A 以及 B 指示此特定行中的特定id分别与第一个或第二个文件一起找到。 在reduce阶段,可以将引用单个键/id的所有值放在 HashSet 集合,其中包含您输入的所有唯一值。这意味着对于每个减速器(又名每个id),一个 HashSet 创建来放置 A 以及 B 字符串,仅存储其中一个示例。所以: 仅在第一个文件中看到的id将具有 HashSet 仅含 A 在里面, 仅在第二个文件中看到的id将具有 HashSet 仅含 B 在里面, 在两个文件中看到的id将具有 HashSet 收藏 A 以及 B 在它里面(也就是文件的交叉点,你不需要)。 有了它,你就可以简单地检查每个身份证 HashSet 只写前面两个选项中的那些。 这种类型的作业可以是这样的(这里的reduce函数实际上不需要在键值对中有一个值,所以我只把 String 使事情更简单):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class SymDiff
{
/* input: <byte_offset, line_of_dataset>
* output: <ID, file>
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] line = value.toString().split(" "); // split each line to two columns
// if the first column consists of integers, put the ID from the 2nd column as the key
// and set "B" as the value to imply that the particular ID was found on the second file
// else, put the ID from the first column as the key
// and set "A" as the value to imply that the particular ID was found on the first file
if(line[0].matches("\\d+")) // (lazy way to see if the first string is an int without throwing an exception)
context.write(new Text(line[1]), new Text("B"));
else
context.write(new Text(line[0]), new Text("A"));
}
}
/* input: <ID, file>
* output: <ID, "">
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
HashSet<String> list_of_files = new HashSet<String>();
// store all the instances of "A" and "B" for each ID in a HashSet with unique values
for(Text value : values)
list_of_files.add(value.toString());
// only write the IDs which they values only contain "A" or "B" (and not both) on their set
if(list_of_files.contains("A") && !list_of_files.contains("B") || (!list_of_files.contains("A") && list_of_files.contains("B")))
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("input");
Path output_dir = new Path("output");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job job = Job.getInstance(conf, "Symmetric Difference");
job.setJarByClass(SymDiff.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, input_dir);
FileOutputFormat.setOutputPath(job, output_dir);
job.waitForCompletion(true);
}
}
1条答案
按热度按时间juzqafwq1#
我必须承认,我怀疑这种计算是否适合mapreduce的严格范例,仅仅是因为这个过程有多复杂,计算量有多大(尽管你说过在你的案例中输入的两个文件大小相同),所以我认为这将是一个很好的例子,找到捷径,同时保持简单的事情。
首先,为了消除额外的iofat,您可能需要将这两个文件放在一个目录中(例如
\input
这里为了简单起见)只是为了绕过多输入的混乱。之后,只需一个mapreduce作业就可以更轻松地完成任务。在Map阶段,您所需要做的就是将两个文件中的id设置为键,并将它们出现的“文件名”设置为它们的值(这是一种安全的方法,可以找到对称性差异,同时继续概括一个id在一个文件中可以被多次看到)。那些“文件名”其实不需要是实际的文件名,你可以把字符串
A
以及B
指示此特定行中的特定id分别与第一个或第二个文件一起找到。在reduce阶段,可以将引用单个键/id的所有值放在
HashSet
集合,其中包含您输入的所有唯一值。这意味着对于每个减速器(又名每个id),一个HashSet
创建来放置A
以及B
字符串,仅存储其中一个示例。所以:仅在第一个文件中看到的id将具有
HashSet
仅含A
在里面,仅在第二个文件中看到的id将具有
HashSet
仅含B
在里面,在两个文件中看到的id将具有
HashSet
收藏A
以及B
在它里面(也就是文件的交叉点,你不需要)。有了它,你就可以简单地检查每个身份证
HashSet
只写前面两个选项中的那些。这种类型的作业可以是这样的(这里的reduce函数实际上不需要在键值对中有一个值,所以我只把
String
使事情更简单):您可以在下面的屏幕截图中看到所需的输出: