我想用mapreduce把一个1000x1000的矩阵和一个1000x1的向量相乘。显示为图片的输入文件由随机的单个数字变量组成,并用空格分隔。请在此处输入图像描述
我的代码如下:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class Matrix_Vector {
public static class MatrixMapper extends
Mapper<LongWritable, Text, Text, Text> {
private String flag = null;// name of the data set
private int rowNum = 1000;// row of matrix A
private int colNum = 1;//cols of matrix B (for vector is one)
private int rowIndexA = 1; // row index of matrix A
private int rowIndexB = 1; // row index of matrix B
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
flag = ((FileSplit) context.getInputSplit()).getPath().getName();// Get the file name
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(" ");
if ("matrix".equals(flag)) {
for (int i = 1; i <= colNum; i++) {
Text k = new Text(rowIndexA + "," + i);
for (int j = 0; j < tokens.length; j++) {
Text v = new Text("a," + (j + 1) + "," + tokens[j]);
context.write(k, v);
}
}
rowIndexA++;// move to next line
} else if ("vector".equals(flag)) {
for (int i = 1; i <= rowNum; i++) {
for (int j = 0; j < tokens.length; j++) {
Text k = new Text(i + "," + (j + 1));
Text v = new Text("b," + rowIndexB + "," + tokens[j]);
context.write(k, v);
}
}
rowIndexB++;// move to next line
}
}
}
public static class MatrixReducer extends
Reducer<Text, Text, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, String> mapA = new HashMap<String, String>();
Map<String, String> mapB = new HashMap<String, String>();
for (Text value : values) {
String[] val = value.toString().split(",");
if ("a".equals(val[0])) {
mapA.put(val[1], val[2]);
} else if ("b".equals(val[0])) {
mapB.put(val[1], val[2]);
}
}
int result = 0;
Iterator<String> mKeys = mapA.keySet().iterator();
while (mKeys.hasNext()) {
String mkey = mKeys.next();
if (mapB.get(mkey) == null) {
continue;
}
result += Integer.parseInt(mapA.get(mkey))
* Integer.parseInt(mapB.get(mkey));
}
context.write(key, new IntWritable(result));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//job start running
long main_start=System.currentTimeMillis();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: MatrixVector <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Matrix_Vector");
job.setJarByClass(Matrix_Vector.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
if (job.waitForCompletion(true)) {
long main_end=System.currentTimeMillis();
System.out.println("The process is end");
System.out.println("Time of running is "+ " : "+(main_end - main_start) +" ms");
System.exit(0);
}else {
System.out.println("The process ended with false");
}
}
}
这是我得到的结果,在我的输出路径中有一个成功文件和一个输出文件,但是在输出文件中没有任何内容,也可以从“bytes writed=0”中看到。我找不到我的代码逻辑哪里出错了,很抱歉结果显示为代码格式,我仍然在学习如何做更好的职位。
21/03/26 08:05:08 INFO mapreduce.Job: Job job_1616694898641_0007 completed successfully
21/03/26 08:05:10 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=6
FILE: Number of bytes written=606314
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2002216
HDFS: Number of bytes written=0
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=102529
Total time spent by all reduces in occupied slots (ms)=13848
Total time spent by all map tasks (ms)=102529
Total time spent by all reduce tasks (ms)=13848
Total vcore-milliseconds taken by all map tasks=102529
Total vcore-milliseconds taken by all reduce tasks=13848
Total megabyte-milliseconds taken by all map tasks=104989696
Total megabyte-milliseconds taken by all reduce tasks=14180352
Map-Reduce Framework
Map input records=2000
Map output records=0
Map output bytes=0
Map output materialized bytes=12
Input split bytes=212
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=12
Reduce input records=0
Reduce output records=0
Spilled Records=0
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=658
CPU time spent (ms)=4140
Physical memory (bytes) snapshot=519413760
Virtual memory (bytes) snapshot=6241820672
Total committed heap usage (bytes)=263872512
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2002004
File Output Format Counters
Bytes Written=0
The process is end
Time of running is : 182011 ms
任何帮助都将不胜感激!
谢谢您,
文
暂无答案!
目前还没有任何答案,快来回答吧!