我试图通过传递一个字符串值将两个值从Map器输出到reducer,但是当我在Map器中解析字符串时,得到了一个越界错误。但是,我在Map器中创建了字符串,所以我确定它有两个值,我做错了什么?如何将两个值从Map器传递到还原器(最后,我需要将更多的变量传递给reducer,但这会使问题变得更简单。)
这是错误:
Error: java.lang.ArrayIndexOutOfBoundsException: 1
at TotalTime$TimeReducer.reduce(TotalTime.java:57)
at TotalTime$TimeReducer.reduce(TotalTime.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:628)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
这是我的密码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TotalTime {
public static class TimeMapper extends Mapper<Object, Text, Text, Text> {
Text textKey = new Text();
Text textValue = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String data = value.toString();
String[] field = data.split(",");
if (null != field && field.length == 4) {
String strTimeIn[] = field[1].split(":");
String strTimeOout[] = field[2].split(":");
int timeOn = Integer.parseInt(strTimeIn[0]) * 3600 + Integer.parseInt(strTimeIn[1]) * 60 + Integer.parseInt(strTimeIn[2]);
int timeOff = Integer.parseInt(strTimeOout[0]) * 3600 + Integer.parseInt(strTimeOout[1]) * 60 + Integer.parseInt(strTimeOout[2]);
String v = String.valueOf(timeOn) + "," + String.valueOf(timeOff);
textKey.set(field[0]);
textValue.set(v);
context.write(textKey, textValue);
}
}
}
public static class TimeReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text textValue = new Text();
int sumTime = 0;
for (Text val : values) {
String line = val.toString();
// Split the string by commas
String[] field = line.split(",");
int timeOn = Integer.parseInt(field[0]);
int timeOff = Integer.parseInt(field[1]);
int time = timeOff - timeOn;
sumTime += time;
}
String v = String.valueOf(sumTime);
textValue.set(v);
context.write(key, textValue);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "User Score");
job.setJarByClass(TotalTime.class);
job.setMapperClass(TimeMapper.class);
job.setCombinerClass(TimeReducer.class);
job.setReducerClass(TimeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
输入文件如下所示:
ID2347,15:40:51,16:21:44,20
ID4568,14:27:57,14:58:04,72
ID8755,13:40:49,13:42:31,99
ID3258,13:12:48,13:37:11,73
ID9666,13:44:34,15:53:36,114
ID8755,09:43:59,10:47:52,123
ID3258,10:25:22,10:41:12,14
ID9666,09:40:10,11:44:01,15
1条答案
按热度按时间szqfcxe21#
似乎是组合器导致了代码失败。记住combiner是在reducer之前运行的一段代码。现在想象一下这个场景:
您的Map程序处理此行:
并将以下输出写入上下文
现在combiner开始工作,在reducer之前处理Map器的输出,并生成以下内容:
现在上一行转到reducer,它失败了,因为在您的代码中,您假设值是这样的
val1,val2
如果您想让代码正常工作,只需删除combiner[或更改逻辑]