我正在尝试用条带法计算单词对数。这是我用过的代码。当我尝试在hdfs中实现时,我得到了。初始化所有收集器失败。上一个收集器中的错误为:null
mapper函数工作正常。我尝试打印它并在本地运行(仅Map器函数),而不是context.write,结果得到了正确的单词对。
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Task1aStrips {
public static class StripsMapper extends Mapper<Object, Text, Text, Map> {
HashMap<String, Integer> m = new HashMap<String, Integer>();
public void map(Object key, Text value, Context context) {
String line[] = value.toString().split("\n");
String[] words = null;
for (int k = 0; k < line.length; k++) {
words = line[k].split(" ");
int start = 0;
int end = 0;
int neighbour = words.length - 1;
String emitPair = "";
for (int i = 0; i < words.length; i++) {
if (i - neighbour < 0) {
start = 0;
} else {
start = i - neighbour;
}
if (i + neighbour >= words.length) {
end = words.length - 1;
} else {
end = i + neighbour;
}
for (int j = start; j <= end; j++) {
if (i == j) {
continue;
} else {
if (m.containsKey(words[j])) {
int sum = (int) m.get(words[j]) + 1;
m.put(words[j], sum);
} else {
m.put(words[j], 1);
}
}
}
try {
context.write(new Text(words[i]), m);
} catch (IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
public static class StripsReducer extends Reducer<Text, Map, Text, Map> {
public void reduce(Text key, Iterable<Map> values, Context context) throws IOException, InterruptedException {
Map reducermap = new HashMap<String, Integer>();
for (Map multimap : values) {
Iterator<Map.Entry<String, Integer>> iterator = multimap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Integer> entry = iterator.next();
String word = entry.getKey() + "";
Integer count = entry.getValue();
if (reducermap.containsKey(word)) {
int sum = (int) reducermap.get(word) + 1;
reducermap.put(word, sum);
} else {
reducermap.put(word, 1);
}
}
}
context.write(key, reducermap);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "TaskA");
job.setJarByClass(Task1aStrips.class);
job.setMapperClass(StripsMapper.class);
job.setCombinerClass(StripsReducer.class);
job.setReducerClass(StripsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Map.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
请帮帮我!
暂无答案!
目前还没有任何答案,快来回答吧!