为什么输出文件中没有mapreduce矩阵和向量乘法?

xlpyo6sf  于 2021-07-13  发布在  Hadoop
关注(0)|答案(0)|浏览(334)

我想用mapreduce把一个1000x1000的矩阵和一个1000x1的向量相乘。显示为图片的输入文件由随机的单个数字变量组成,并用空格分隔。请在此处输入图像描述
我的代码如下:

  1. import java.io.IOException;
  2. import java.util.*;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.conf.*;
  5. import org.apache.hadoop.io.*;
  6. import org.apache.hadoop.mapreduce.*;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  11. import org.apache.hadoop.util.GenericOptionsParser;
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  13. public class Matrix_Vector {
  14. public static class MatrixMapper extends
  15. Mapper<LongWritable, Text, Text, Text> {
  16. private String flag = null;// name of the data set
  17. private int rowNum = 1000;// row of matrix A
  18. private int colNum = 1;//cols of matrix B (for vector is one)
  19. private int rowIndexA = 1; // row index of matrix A
  20. private int rowIndexB = 1; // row index of matrix B
  21. @Override
  22. protected void setup(Context context) throws IOException,
  23. InterruptedException {
  24. flag = ((FileSplit) context.getInputSplit()).getPath().getName();// Get the file name
  25. }
  26. @Override
  27. protected void map(LongWritable key, Text value, Context context)
  28. throws IOException, InterruptedException {
  29. String[] tokens = value.toString().split(" ");
  30. if ("matrix".equals(flag)) {
  31. for (int i = 1; i <= colNum; i++) {
  32. Text k = new Text(rowIndexA + "," + i);
  33. for (int j = 0; j < tokens.length; j++) {
  34. Text v = new Text("a," + (j + 1) + "," + tokens[j]);
  35. context.write(k, v);
  36. }
  37. }
  38. rowIndexA++;// move to next line
  39. } else if ("vector".equals(flag)) {
  40. for (int i = 1; i <= rowNum; i++) {
  41. for (int j = 0; j < tokens.length; j++) {
  42. Text k = new Text(i + "," + (j + 1));
  43. Text v = new Text("b," + rowIndexB + "," + tokens[j]);
  44. context.write(k, v);
  45. }
  46. }
  47. rowIndexB++;// move to next line
  48. }
  49. }
  50. }
  51. public static class MatrixReducer extends
  52. Reducer<Text, Text, Text, IntWritable> {
  53. @Override
  54. protected void reduce(Text key, Iterable<Text> values, Context context)
  55. throws IOException, InterruptedException {
  56. Map<String, String> mapA = new HashMap<String, String>();
  57. Map<String, String> mapB = new HashMap<String, String>();
  58. for (Text value : values) {
  59. String[] val = value.toString().split(",");
  60. if ("a".equals(val[0])) {
  61. mapA.put(val[1], val[2]);
  62. } else if ("b".equals(val[0])) {
  63. mapB.put(val[1], val[2]);
  64. }
  65. }
  66. int result = 0;
  67. Iterator<String> mKeys = mapA.keySet().iterator();
  68. while (mKeys.hasNext()) {
  69. String mkey = mKeys.next();
  70. if (mapB.get(mkey) == null) {
  71. continue;
  72. }
  73. result += Integer.parseInt(mapA.get(mkey))
  74. * Integer.parseInt(mapB.get(mkey));
  75. }
  76. context.write(key, new IntWritable(result));
  77. }
  78. }
  79. public static void main(String[] args) throws Exception {
  80. Configuration conf = new Configuration();
  81. //job start running
  82. long main_start=System.currentTimeMillis();
  83. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  84. if (otherArgs.length != 2)
  85. {
  86. System.err.println("Usage: MatrixVector <in> <out>");
  87. System.exit(2);
  88. }
  89. Job job = Job.getInstance(conf, "Matrix_Vector");
  90. job.setJarByClass(Matrix_Vector.class);
  91. job.setMapOutputKeyClass(Text.class);
  92. job.setMapOutputValueClass(Text.class);
  93. job.setMapperClass(MatrixMapper.class);
  94. job.setReducerClass(MatrixReducer.class);
  95. job.setOutputKeyClass(Text.class);
  96. job.setOutputValueClass(Text.class);
  97. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  98. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  99. job.setInputFormatClass(TextInputFormat.class);
  100. job.setOutputFormatClass(TextOutputFormat.class);
  101. if (job.waitForCompletion(true)) {
  102. long main_end=System.currentTimeMillis();
  103. System.out.println("The process is end");
  104. System.out.println("Time of running is "+ " : "+(main_end - main_start) +" ms");
  105. System.exit(0);
  106. }else {
  107. System.out.println("The process ended with false");
  108. }
  109. }
  110. }

这是我得到的结果,在我的输出路径中有一个成功文件和一个输出文件,但是在输出文件中没有任何内容,也可以从“bytes writed=0”中看到。我找不到我的代码逻辑哪里出错了,很抱歉结果显示为代码格式,我仍然在学习如何做更好的职位。

  1. 21/03/26 08:05:08 INFO mapreduce.Job: Job job_1616694898641_0007 completed successfully
  2. 21/03/26 08:05:10 INFO mapreduce.Job: Counters: 49
  3. File System Counters
  4. FILE: Number of bytes read=6
  5. FILE: Number of bytes written=606314
  6. FILE: Number of read operations=0
  7. FILE: Number of large read operations=0
  8. FILE: Number of write operations=0
  9. HDFS: Number of bytes read=2002216
  10. HDFS: Number of bytes written=0
  11. HDFS: Number of read operations=9
  12. HDFS: Number of large read operations=0
  13. HDFS: Number of write operations=2
  14. Job Counters
  15. Launched map tasks=2
  16. Launched reduce tasks=1
  17. Data-local map tasks=2
  18. Total time spent by all maps in occupied slots (ms)=102529
  19. Total time spent by all reduces in occupied slots (ms)=13848
  20. Total time spent by all map tasks (ms)=102529
  21. Total time spent by all reduce tasks (ms)=13848
  22. Total vcore-milliseconds taken by all map tasks=102529
  23. Total vcore-milliseconds taken by all reduce tasks=13848
  24. Total megabyte-milliseconds taken by all map tasks=104989696
  25. Total megabyte-milliseconds taken by all reduce tasks=14180352
  26. Map-Reduce Framework
  27. Map input records=2000
  28. Map output records=0
  29. Map output bytes=0
  30. Map output materialized bytes=12
  31. Input split bytes=212
  32. Combine input records=0
  33. Combine output records=0
  34. Reduce input groups=0
  35. Reduce shuffle bytes=12
  36. Reduce input records=0
  37. Reduce output records=0
  38. Spilled Records=0
  39. Shuffled Maps =2
  40. Failed Shuffles=0
  41. Merged Map outputs=2
  42. GC time elapsed (ms)=658
  43. CPU time spent (ms)=4140
  44. Physical memory (bytes) snapshot=519413760
  45. Virtual memory (bytes) snapshot=6241820672
  46. Total committed heap usage (bytes)=263872512
  47. Shuffle Errors
  48. BAD_ID=0
  49. CONNECTION=0
  50. IO_ERROR=0
  51. WRONG_LENGTH=0
  52. WRONG_MAP=0
  53. WRONG_REDUCE=0
  54. File Input Format Counters
  55. Bytes Read=2002004
  56. File Output Format Counters
  57. Bytes Written=0
  58. The process is end
  59. Time of running is : 182011 ms

任何帮助都将不胜感激!
谢谢您,

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题