matrix乘法hadoop-mapreduce

wswtfjt7  于 2021-07-15  发布在  Hadoop
关注(0)|答案(0)|浏览(349)

我在用mapreduce做矩阵乘法。我使用下面的代码构建了一个jar文件。代码对于较小的矩阵工作得非常好,但当文件变大时,Map阶段将在67%停止,然后它将给出以下错误:

  1. Java.Lang.ArrayIndexOutOfBoundsException: 2
  2. at MatrixMult$mapper.map(MatrixMult.java:44)
  3. at Matrix$mapper.map(MatrixMult.java:1)
  4. at org.apache.hadoop.mapreduce.mapper.run(Mapper.java:145)
  5. at org.apache.hadoop.mapred.Maptask.runNewMapper(mapTask.java:793)
  6. at org.apache.hadoop.mapred.maptask.run(maptask.java:341)
  7. at org.apache.hadoop.mapred.yarnChild$2.run(YarnChild.java:164)
  8. at java.security.accesscontroller.dopriviledged(Native Method)
  9. at javax.security.auth.subject.doAs(Subject.Java:415)
  10. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
  11. at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

当我使用较小的矩阵时,mapreduce就起作用了。我将在下面发布mapper和reducer的代码:

  1. public class Map
  2. extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
  3. @Override
  4. public void map(LongWritable key, Text value, Context context)
  5. throws IOException, InterruptedException {
  6. Configuration conf = context.getConfiguration();
  7. int m = Integer.parseInt(conf.get("m"));
  8. int p = Integer.parseInt(conf.get("p"));
  9. String line = value.toString();
  10. // (M, i, j, Mij);
  11. String[] indicesAndValue = line.split(",");
  12. Text outputKey = new Text();
  13. Text outputValue = new Text();
  14. if (indicesAndValue[0].equals("M")) {
  15. for (int k = 0; k < p; k++) {
  16. outputKey.set(indicesAndValue[1] + "," + k);
  17. // outputKey.set(i,k);
  18. outputValue.set(indicesAndValue[0] + "," + indicesAndValue[2]
  19. + "," + indicesAndValue[3]);
  20. // outputValue.set(M,j,Mij);
  21. context.write(outputKey, outputValue);
  22. }
  23. } else {
  24. // (N, j, k, Njk);
  25. for (int i = 0; i < m; i++) {
  26. outputKey.set(i + "," + indicesAndValue[2]);
  27. outputValue.set("N," + indicesAndValue[1] + ","
  28. + indicesAndValue[3]);
  29. context.write(outputKey, outputValue);
  30. }
  31. }
  32. }
  33. }
  34. public class Reduce
  35. extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
  36. @Override
  37. public void reduce(Text key, Iterable<Text> values, Context context)
  38. throws IOException, InterruptedException {
  39. String[] value;
  40. //key=(i,k),
  41. //Values = [(M/N,j,V/W),..]
  42. HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
  43. HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
  44. for (Text val : values) {
  45. value = val.toString().split(",");
  46. if (value[0].equals("M")) {
  47. hashA.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
  48. } else {
  49. hashB.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
  50. }
  51. }
  52. int n = Integer.parseInt(context.getConfiguration().get("n"));
  53. float result = 0.0f;
  54. float m_ij;
  55. float n_jk;
  56. for (int j = 0; j < n; j++) {
  57. m_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
  58. n_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
  59. result += m_ij * n_jk;
  60. }
  61. if (result != 0.0f) {
  62. context.write(null,
  63. new Text(key.toString() + "," + Float.toString(result)));
  64. }
  65. }
  66. }
  67. public class MatrixMultiply {
  68. public static void main(String[] args) throws Exception {
  69. if (args.length != 2) {
  70. System.err.println("Usage: MatrixMultiply <in_dir> <out_dir>");
  71. System.exit(2);
  72. }
  73. Configuration conf = new Configuration();
  74. // M is an m-by-n matrix; N is an n-by-p matrix.
  75. conf.set("m", "1000");
  76. conf.set("n", "100");
  77. conf.set("p", "1000");
  78. @SuppressWarnings("deprecation")
  79. Job job = new Job(conf, "MatrixMultiply");
  80. job.setJarByClass(MatrixMultiply.class);
  81. job.setOutputKeyClass(Text.class);
  82. job.setOutputValueClass(Text.class);
  83. job.setMapperClass(Map.class);
  84. job.setReducerClass(Reduce.class);
  85. job.setInputFormatClass(TextInputFormat.class);
  86. job.setOutputFormatClass(TextOutputFormat.class);
  87. FileInputFormat.addInputPath(job, new Path(args[0]));
  88. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  89. job.waitForCompletion(true);
  90. }
  91. }

我不太清楚错误是从哪里来的,但我知道每当我使用一个大文件时,我都会遇到这个问题

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题