matrix乘法hadoop-mapreduce

wswtfjt7  于 2021-07-15  发布在  Hadoop
关注(0)|答案(0)|浏览(305)

我在用mapreduce做矩阵乘法。我使用下面的代码构建了一个jar文件。代码对于较小的矩阵工作得非常好,但当文件变大时,Map阶段将在67%停止,然后它将给出以下错误:

Java.Lang.ArrayIndexOutOfBoundsException: 2
    at MatrixMult$mapper.map(MatrixMult.java:44)
    at Matrix$mapper.map(MatrixMult.java:1)
    at org.apache.hadoop.mapreduce.mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.Maptask.runNewMapper(mapTask.java:793)
    at org.apache.hadoop.mapred.maptask.run(maptask.java:341)
    at org.apache.hadoop.mapred.yarnChild$2.run(YarnChild.java:164)
    at java.security.accesscontroller.dopriviledged(Native Method)
    at javax.security.auth.subject.doAs(Subject.Java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

当我使用较小的矩阵时,mapreduce就起作用了。我将在下面发布mapper和reducer的代码:

public class Map
  extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
        @Override
        public void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
                Configuration conf = context.getConfiguration();
                int m = Integer.parseInt(conf.get("m"));
                int p = Integer.parseInt(conf.get("p"));
                String line = value.toString();
                // (M, i, j, Mij);
                String[] indicesAndValue = line.split(",");
                Text outputKey = new Text();
                Text outputValue = new Text();
                if (indicesAndValue[0].equals("M")) {
                        for (int k = 0; k < p; k++) {
                                outputKey.set(indicesAndValue[1] + "," + k);
                                // outputKey.set(i,k);
                                outputValue.set(indicesAndValue[0] + "," + indicesAndValue[2]
                                                + "," + indicesAndValue[3]);
                                // outputValue.set(M,j,Mij);
                                context.write(outputKey, outputValue);
                        }
                } else {
                        // (N, j, k, Njk);
                        for (int i = 0; i < m; i++) {
                                outputKey.set(i + "," + indicesAndValue[2]);
                                outputValue.set("N," + indicesAndValue[1] + ","
                                                + indicesAndValue[3]);
                                context.write(outputKey, outputValue);
                        }
                }
        }
}

public class Reduce
  extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
                String[] value;
                //key=(i,k),
                //Values = [(M/N,j,V/W),..]
                HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
                HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
                for (Text val : values) {
                        value = val.toString().split(",");
                        if (value[0].equals("M")) {
                                hashA.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
                        } else {
                                hashB.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
                        }
                }
                int n = Integer.parseInt(context.getConfiguration().get("n"));
                float result = 0.0f;
                float m_ij;
                float n_jk;
                for (int j = 0; j < n; j++) {
                        m_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
                        n_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
                        result += m_ij * n_jk;
                }
                if (result != 0.0f) {
                        context.write(null,
                                        new Text(key.toString() + "," + Float.toString(result)));
                }
        }
}

public class MatrixMultiply {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MatrixMultiply <in_dir> <out_dir>");
            System.exit(2);
        }
        Configuration conf = new Configuration();
        // M is an m-by-n matrix; N is an n-by-p matrix.
        conf.set("m", "1000");
        conf.set("n", "100");
        conf.set("p", "1000");
        @SuppressWarnings("deprecation")
                Job job = new Job(conf, "MatrixMultiply");
        job.setJarByClass(MatrixMultiply.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

我不太清楚错误是从哪里来的,但我知道每当我使用一个大文件时,我都会遇到这个问题

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题