RecordReader表示以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类,系统默认的RecordReader是LineRecordReader,它是TextInputFormat对应的RecordReader;而SequenceFileInputFormat对应的RecordReader是SequenceFileRecordReader。LineRecordReader是每行的偏移量作为读入map的key,每行的内容作为读入map的value。很多时候hadoop内置的RecordReader并不能满足我们的需求,比如我们在读取记录的时候,希望Map读入的Key值不是偏移量而是行号或者是文件名,这时候就需要我们自定义RecordReader。
**(1):**继承抽象类RecordReader,实现RecordReader的一个实例。
**(2):**实现自定义InputFormat类,重写InputFormat中的CreateRecordReader()方法,返回值是自定义的RecordReader实例。
**(3):**配置job.setInputFormatClass()为自定义的InputFormat实例。
#需求:统计data文件中奇数行和偶数行的和:
10
20
50
15
30
100
实现代码如下:
MyRecordReader.java:
public class MyRecordReader extends RecordReader<LongWritable, Text>{
//起始位置(相对整个分片而言)
private long start;
//结束位置(相对整个分片而言)
private long end;
//当前位置
private long pos;
//文件输入流
private FSDataInputStream fin = null;
//key、value
private LongWritable key = null;
private Text value = null;
//定义行阅读器(hadoop.util包下的类)
private LineReader reader = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//获取分片
FileSplit fileSplit = (FileSplit) split;
//获取起始位置
start = fileSplit.getStart();
//获取结束位置
end = start + fileSplit.getLength();
//创建配置
Configuration conf = context.getConfiguration();
//获取文件路径
Path path = fileSplit.getPath();
//根据路径获取文件系统
FileSystem fileSystem = path.getFileSystem(conf);
//打开文件输入流
fin = fileSystem.open(path);
//找到开始位置开始读取
fin.seek(start);
//创建阅读器
reader = new LineReader(fin);
//将当期位置置为1
pos = 1;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null){
key = new LongWritable();
}
key.set(pos);
if (value == null){
value = new Text();
}
if (reader.readLine(value) == 0){
return false;
}
pos ++;
return true;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value ;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
fin.close();
}
}
MyInputFormat.java
public class MyInputFormat extends FileInputFormat<LongWritable, Text>{
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//返回自定义的RecordReader
return new MyRecordReader();
}
/**
* 为了使得切分数据的时候行号不发生错乱
* 这里设置为不进行切分
*/
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
}
MyPartitioner.java
public class MyPartitioner extends Partitioner<LongWritable, Text>{
@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
//偶数放到第二个分区进行计算
if (key.get() % 2 == 0){
//将输入到reduce中的key设置为1
key.set(1);
return 1;
} else {//奇数放在第一个分区进行计算
//将输入到reduce中的key设置为0
key.set(0);
return 0;
}
}
}
主类 RecordReaderTest.java
public class RecordReaderTest {
// 定义输入路径
private static String IN_PATH = "";
// 定义输出路径
private static String OUT_PATH = "";
public static void main(String[] args) {
try {
// 创建配置信息
Configuration conf = new Configuration();
// 获取命令行的参数
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// 当参数违法时,中断程序
if (otherArgs.length != 2) {
System.err.println("Usage:wordcount<in> <out>");
System.exit(1);
}
// 给路径赋值
IN_PATH = otherArgs[0];
OUT_PATH = otherArgs[1];
// 创建文件系统
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
// 如果输出目录存在,我们就删除
if (fileSystem.exists(new Path(new URI(OUT_PATH)))) {
fileSystem.delete(new Path(new URI(OUT_PATH)), true);
}
// 创建任务
Job job = new Job(conf, RecordReaderTest.class.getName());
// 打成jar包运行,这句话是关键
job.setJarByClass(RecordReaderTest.class);
// 1.1 设置输入目录和设置输入数据格式化的类
FileInputFormat.setInputPaths(job, IN_PATH);
job.setInputFormatClass(MyInputFormat.class);
// 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
job.setMapperClass(RecordReaderMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
// 1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(2);
// 1.4 排序
// 1.5 归约
// 2.1 Shuffle把数据从Map端拷贝到Reduce端。
// 2.2 指定Reducer类和输出key和value的类型
job.setReducerClass(RecordReaderReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 2.3 指定输出的路径和设置输出的格式化类
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
// 提交作业 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class RecordReaderMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException,
InterruptedException {
// 直接将读取的记录写出去
context.write(key, value);
}
}
public static class RecordReaderReducer extends Reducer<LongWritable, Text, Text, LongWritable> {
// 创建写出去的key和value
private Text outKey = new Text();
private LongWritable outValue = new LongWritable();
protected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
InterruptedException {
System.out.println("奇数行还是偶数行:" + key);
// 定义求和的变量
long sum = 0;
// 遍历value求和
for (Text val : values) {
// 累加
sum += Long.parseLong(val.toString());
}
// 判断奇偶数
if (key.get() == 0) {
outKey.set("奇数之和为:");
} else {
outKey.set("偶数之和为:");
}
// 设置value
outValue.set(sum);
// 把结果写出去
context.write(outKey, outValue);
}
}
}
程序运行结果:
原文转自:https://blog.csdn.net/lzm1340458776/article/details/43054037
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://marco.blog.csdn.net/article/details/81629454
内容来源于网络,如有侵权,请联系作者删除!