我有一些正确选择源代码和最高权重的代码。我似乎不能把目标列也拉进来。有人能给我指出正确的方向吗?我以前从未使用过java。我认为reducer函数需要返回一个元组。因此,mapper函数中的变量目标是否需要这个元组?
所需输出:每行包含一个节点id,后跟一个tab(\t)和所需的“tgt,weight”元组。元组是权重最高的tgt。在平局的情况下,用最小的数字返回tgt。
输入
src tgt weight
1 110 3
1 200 1
20 150 30
10 110 10
11 130 15
11 200 67
1 70 3
预期产量
1 70,3
20 150,30
10 110,10
11 200,67
当前输出(需要作为tuple添加到tgt列中)
1 3
20 30
10 10
11 67
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Q1 {
public static class TargetMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text target = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), "\r");
while (st.hasMoreTokens()) {
String[] edge = st.nextToken().split("\t");
target.set(edge[0]);
context.write(target, new IntWritable(Integer.parseInt(edge[2])));
}
}
}
public static class EmailsReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable totalCount = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> targets, Context context) throws IOException, InterruptedException{
int max = 0;
for (IntWritable target : targets) {
if(target.get() > max || max ==0) {
max = target.get();
}
}
totalCount.set(max);
context.write(key, totalCount);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Q1");
job.setJarByClass(Q1.class);
job.setMapperClass(TargetMapper.class);
job.setCombinerClass(EmailsReducer.class);
job.setReducerClass(EmailsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1条答案
按热度按时间ftf50wuq1#
您对自定义输出感兴趣。要实现这一点,请尝试实现自定义
WritableComparable
. 你可能需要更新你的逻辑,使它根据你的需要工作。比如:
并更新代码,将其用作mapper&reducer中的值。比如: