我在用mapreduce做矩阵乘法。我使用下面的代码构建了一个jar文件。代码对于较小的矩阵工作得非常好,但当文件变大时,Map阶段将在67%停止,然后它将给出以下错误:
Java.Lang.ArrayIndexOutOfBoundsException: 2
at MatrixMult$mapper.map(MatrixMult.java:44)
at Matrix$mapper.map(MatrixMult.java:1)
at org.apache.hadoop.mapreduce.mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.Maptask.runNewMapper(mapTask.java:793)
at org.apache.hadoop.mapred.maptask.run(maptask.java:341)
at org.apache.hadoop.mapred.yarnChild$2.run(YarnChild.java:164)
at java.security.accesscontroller.dopriviledged(Native Method)
at javax.security.auth.subject.doAs(Subject.Java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
当我使用较小的矩阵时,mapreduce就起作用了。我将在下面发布mapper和reducer的代码:
public class Map
extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int m = Integer.parseInt(conf.get("m"));
int p = Integer.parseInt(conf.get("p"));
String line = value.toString();
// (M, i, j, Mij);
String[] indicesAndValue = line.split(",");
Text outputKey = new Text();
Text outputValue = new Text();
if (indicesAndValue[0].equals("M")) {
for (int k = 0; k < p; k++) {
outputKey.set(indicesAndValue[1] + "," + k);
// outputKey.set(i,k);
outputValue.set(indicesAndValue[0] + "," + indicesAndValue[2]
+ "," + indicesAndValue[3]);
// outputValue.set(M,j,Mij);
context.write(outputKey, outputValue);
}
} else {
// (N, j, k, Njk);
for (int i = 0; i < m; i++) {
outputKey.set(i + "," + indicesAndValue[2]);
outputValue.set("N," + indicesAndValue[1] + ","
+ indicesAndValue[3]);
context.write(outputKey, outputValue);
}
}
}
}
public class Reduce
extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String[] value;
//key=(i,k),
//Values = [(M/N,j,V/W),..]
HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
for (Text val : values) {
value = val.toString().split(",");
if (value[0].equals("M")) {
hashA.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
} else {
hashB.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
}
}
int n = Integer.parseInt(context.getConfiguration().get("n"));
float result = 0.0f;
float m_ij;
float n_jk;
for (int j = 0; j < n; j++) {
m_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
n_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
result += m_ij * n_jk;
}
if (result != 0.0f) {
context.write(null,
new Text(key.toString() + "," + Float.toString(result)));
}
}
}
public class MatrixMultiply {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MatrixMultiply <in_dir> <out_dir>");
System.exit(2);
}
Configuration conf = new Configuration();
// M is an m-by-n matrix; N is an n-by-p matrix.
conf.set("m", "1000");
conf.set("n", "100");
conf.set("p", "1000");
@SuppressWarnings("deprecation")
Job job = new Job(conf, "MatrixMultiply");
job.setJarByClass(MatrixMultiply.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
我不太清楚错误是从哪里来的,但我知道每当我使用一个大文件时,我都会遇到这个问题
暂无答案!
目前还没有任何答案,快来回答吧!