hadoopmapreduce2文件过滤?

ztyzrc3y  于 2021-07-09  发布在  Java
关注(0)|答案(1)|浏览(373)

我需要打印出那些没有订单号的客户的“姓名”。我知道我必须使用Map器方法来示例化变量。我必须使用2个Map器,因为有2个输入文件。在reduce阶段,我必须过滤掉没有订单的客户。但是,如何筛选出那些没有订单号的客户?
文件1.txt

  1. Cust.No. Name
  2. 1 Adam
  3. 2 Abe
  4. 3 Alex
  5. 4 Jones

文件2.txt

  1. Order.Num. Cust.No. Price
  2. 01 1 5422
  3. 02 1 23
  4. 03 2 1265
  5. 04 3 127

我做了什么
最初在reducer方法中,我对键进行循环,并检查它是否与现有键匹配:

  1. if (!(Data[0].equals("key")))
  2. {
  3. System.out.println(Data[1]);
  4. }

但是,它打印每一行。

xienkqul

xienkqul1#

看起来像是一个常规的reduce-side连接,所以它可能是一个简单的用例,然而这些类型的计算往往在工作负载方面变得非常残酷。这意味着我们必须找到捷径,以确保应用程序能够很好地扩展到更大的输入规模。
为应用程序的执行节省时间/空间的最常见的方法是,尝试设计可能的几个mr作业,使我们可以在保留所有功能的同时“剪切”一个或多个作业,或者尝试最小化将在输入数据处实现的(自定义)Map器的数量。这两个函数中的后一个对于您正在尝试实现的这种过滤非常常见,因为我们可以很容易地使用一个map函数,它的每个示例将检查当前正在读取的文件的名称,从而相应地执行操作。
更具体地说,我们可以得到 File1.txt 以及 File2.txt 在Map程序开始遍历 setup 函数,并使用当前要读取的文件名来确定如何将文件中的数据切分并存储到键值对中。对于您的问题,此Map函数将输出两种类型的键值对: <customer_ID, customer_name> (对于中的数据) File1.txt ) <customer_ID, order_ID> (对于中的数据) File2.txt )
然后reduce函数的示例将为每个客户运行(当然,因为客户id和名称是唯一的),并访问分组的值,这些值只不过是一些 Text 包含此客户名称或订单id的对象。我们只想输出记录中没有任何订单的客户,因此我们所要做的就是检查此值列表的长度是否为 1 (也就是说,如果这个客户没有一对值,而另一个客户没有他的名字)。
为了展示这一点,我将两个输入文件放在一个目录中 /input 在hdfs中(我对中的列使用了两个制表符分隔符) File1.txt 和三个制表符分隔符 File2.txt . 如果您的文件在列之间有不同的选项卡或空格,您可以相应地更改它们):
文件1.txt

  1. Cust.No Name
  2. 1 Adam
  3. 2 Abe
  4. 3 Alex
  5. 4 Jones

文件2.txt

  1. Order.Num. Cust.No. Price
  2. 01 1 5422
  3. 02 1 23
  4. 03 2 1265
  5. 04 3 127

执行过滤的程序可以如下所示:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.FileSystem;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.InputSplit;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.IOException;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. public class OrderListFilter
  17. {
  18. /* input: <byte_offset, line_of_dataset>
  19. * output: <customer_ID, customer_name> OR <customer_ID, order_ID>
  20. */
  21. public static class Map extends Mapper<LongWritable, Text, Text, Text>
  22. {
  23. private String current_filename = "";
  24. protected void setup(Context context)
  25. {
  26. // get the name of the current to-be-read file
  27. InputSplit split = context.getInputSplit();
  28. Path path = ((FileSplit) split).getPath();
  29. current_filename = path.getName();
  30. }
  31. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
  32. {
  33. if(current_filename.equals("File1.txt")) // if mapper is reading through the customer's file
  34. {
  35. if(value.toString().contains("Cust.No")) // remove header
  36. return;
  37. else
  38. {
  39. String[] columns = value.toString().split("\t\t"); // 2 tabs as delimiter
  40. // write customer ID as key and name as value
  41. context.write(new Text(columns[0]), new Text(columns[1]));
  42. }
  43. }
  44. else if(current_filename.equals("File2.txt")) // if mapper is reading through the order's file
  45. {
  46. if(value.toString().contains("Cust.No")) // remove header
  47. return;
  48. else
  49. {
  50. String[] columns = value.toString().split("\t\t\t"); // 3 tabs as delimiter
  51. // write customer ID as key and order num as value
  52. context.write(new Text(columns[1]), new Text(columns[0]));
  53. }
  54. }
  55. }
  56. }
  57. /* input: <customer_ID, customer_name> OR <customer_ID, order_ID>
  58. * output: <customer_ID, customer_name>
  59. */
  60. public static class Reduce extends Reducer<Text, Text, Text, Text>
  61. {
  62. public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
  63. {
  64. List<String> customer_records = new ArrayList<String>();
  65. // put all the values in a list to find the size of them
  66. for(Text value : values)
  67. customer_records.add(value.toString());
  68. // if there's only one record, i.e. just the ID and the customer's name in they key-value pairs,
  69. // write their ID and name to output
  70. if(customer_records.size() == 1)
  71. context.write(key, new Text(customer_records.get(0)));
  72. }
  73. }
  74. public static void main(String[] args) throws Exception
  75. {
  76. // set the paths of the input and output directories in the HDFS
  77. Path input_dir = new Path("input");
  78. Path output_dir = new Path("output");
  79. // in case the output directory already exists, delete it
  80. Configuration conf = new Configuration();
  81. FileSystem fs = FileSystem.get(conf);
  82. if(fs.exists(output_dir))
  83. fs.delete(output_dir, true);
  84. // configure the MapReduce job
  85. Job job = Job.getInstance(conf, "Order List Filter");
  86. job.setJarByClass(OrderListFilter.class);
  87. job.setMapperClass(Map.class);
  88. job.setReducerClass(Reduce.class);
  89. job.setMapOutputKeyClass(Text.class);
  90. job.setMapOutputValueClass(Text.class);
  91. job.setOutputKeyClass(Text.class);
  92. job.setOutputValueClass(Text.class);
  93. FileInputFormat.addInputPath(job, input_dir);
  94. FileOutputFormat.setOutputPath(job, output_dir);
  95. job.waitForCompletion(true);
  96. }
  97. }

它的输出看起来是一个好的(忽略我的设置中的警告):

展开查看全部

相关问题