在编写MapReduce程序时,需要考虑的几个方面:
输入数据接口:InputFormat
逻辑处理接口Mapper
用户根据业务需求实现其中的三个方法map(),setuo(),cleanup()
Partitioner分区
有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号
key.hashCode()&Integer.MAXVALUE % numReduces
如果业务上有特殊需求,可以自定义分区
Comparable排序
Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输,但是使用时必须不能影响原有的业务处理结果
Reduce端分组:Groupingcomparator
逻辑处理接口Reduce
用户根据业务需求实现其中三个方法:reduce(),setup(),cleanup()
输出数据接口:OutputFormat
基本原则:
压缩格式 | hadoop自带? | 算法 | 文件扩展名 | 是否可切分 | 换成压缩格式后,原来的程序是否需要修改 |
---|---|---|---|---|---|
DEDAULT | 是,直接使用 | DEFAULT | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFAULT | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 否,需要安装 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示
压缩格式 | 对应的编码/解码器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
压缩性能的比较
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/S | 58MB/S |
bzip2 | 8.3GB | 1.1GB | 2.4MB/S | 9.5MB/S |
LZO | 8.3GB | 2.9GB | 49.3MB/S | 74.6MB/S |
Gzip压缩
优点
压缩率比较高,而且压缩/解压缩的速度也比较快;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;大部分linux系统都自带gzip命令,使用方便
缺点:不支持split
应用场景:当每个文件压缩之后在130M以内的(一个块大小内),都可以考虑用gzip压缩格式。例如说,一天或者一个小时的日志压缩成一个gzip文件,运行mapReduce程序的时候通过多个gzip文件达到并发。hive程序,streaming程序,和java写的mapReduce程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改
Bzip2压缩
优点:
支持split,具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native(java和c互操作的API接口);在linux系统下自带bzip2命令,使用方便
缺点:压缩/解压缩速度慢;不支持native
应用场景:
适合对速度要求不高,但需要较高的压缩率的时候,可以作为MapReduce作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用的比较少的情况;或者对单个很大的文本文件向压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不要要修改)的情况
Lzo压缩
优点:
压缩/解压的速度比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;可以在linux西永下安装lzop命令,使用方便
缺点:
压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
应用场景:
一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越明显
Snappy压缩
压缩可以在MapReduce作用的任意阶段启用
要在Hadoop中启用压缩,可以配置如下参数
参数 | 默认值 | 阶段 | 建议 |
---|---|---|---|
io.compression.codecs (在core-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 使用LZO或snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress. DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) | RECORD | reducer输出 | SequenceFile输出使用的压缩类型:NONE和BLOCK |
CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。想要对正在被写入的一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream,将其以压缩格式写入底层的流。相反,要想对从输入流读取而来的数据进行压缩,则调用createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据
public class TestCompress {
public static void main(String[] args) throws Exception{
// compress("G:\\学习\\maven\\src\\main\\java\\Compress\\kkkk.doc","org.apache.hadoop.io.compress.BZip2Codec");
decompress("G:\\学习\\maven\\src\\main\\java\\Compress\\kkkk.doc.bz2",".doc");
}
/**
* 压缩方法
* @param filename 文件路径+文件名
* @param methon 解码器
*/
private static void compress(String filename,String methon) throws Exception {
//创建输入流
FileInputStream fis = new FileInputStream(new File(filename));
//通过反射类找到解码器的类
Class<?> codeClass = Class.forName(methon);
//通过反射工具类找到解码器对象,需要用到配置conf对象
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, new Configuration());
//创建输出流
FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
//获得解码器输出对象
CompressionOutputStream cos = codec.createOutputStream(fos);
//流拷贝
IOUtils.copyBytes(fis,cos,5*1024*1024,false);
//关闭流
cos.close();
fos.close();
fis.close();
}
/**
* 解开压缩
* @param filename 文件路径 + 文件名
* @param decoded 后缀
* @throws Exception
*/
private static void decompress(String filename,String decoded)throws Exception{
//获取factory实例
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = factory.getCodec(new Path(filename));
if (codec == null){
System.out.println(filename);
return;
}
//解压缩的输入
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
//输出流
FileOutputStream fos = new FileOutputStream(new File(filename + "." + decoded));
//流拷贝
IOUtils.copyBytes(cis,fos,5*1024*1024,false);
//关闭流
cis.close();
fos.close();
}
}
即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可
给大家提供的hadoop源码支持的压缩格式有:BZip2Codec 、DefaultCodec
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"G:\\学习\\maven\\src\\main\\java\\Compress\\wordcount.txt","G:\\学习\\maven\\src\\main\\java\\Compress\\out"};
Configuration configuration = new Configuration();
// 开启map端输出压缩
configuration.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(CompressMapper.class);
job.setReducerClass(CompressReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 1 : 0);
}
}
mapper保持不变
public class CompressMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取一行
String line = value.toString();
//2.切割
String[] words = line.split(" ");
//3.循环写出
for(String word : words){
context.write(new Text(word),new IntWritable(1));
}
}
}
reduce保持不变
public class CompressReduce extends Reducer<Text, IntWritable, Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
//1.汇总
for(IntWritable value:values){
count+= value.get();
}
//2.输出
context.write(key,new IntWritable(count));
}
}
基于wordcount案例处理
public class WordCountDriver1 {
public static void main(String[] args) throws Exception {
args = new String[]{"G:\\学习\\maven\\src\\main\\java\\Compress\\wordcount.txt","G:\\学习\\maven\\src\\main\\java\\Compress\\out"};
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver1.class);
job.setMapperClass(CompressMapper.class);
job.setReducerClass(CompressReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
boolean result = job.waitForCompletion(true);
System.exit(result?1:0);
}
}
mapper和Reducer保持不变
计算机性能
cpu、内存、磁盘健康、网络
I/O操作优化
MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、I/O传输、数据倾斜问题和常用的调优参数
数据倾斜现象
数据频率倾斜——>某一个区域的数据量要远远大于其他区域
数据大小倾斜——>部分记录的大小远远大于平均值
如何收集倾斜数据
在reduce方法中加入记录map输入键的详细情况的功能
public static final String MAX_VALUES = "skew.maxvalues";
private int maxValueThreshold;
@Override
public void configure(JobConf job) {
maxValueThreshold = job.getInt(MAX_VALUES, 100);
}
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
int i = 0;
while (values.hasNext()) {
values.next();
i++;
}
if (++i > maxValueThreshold) {
log.info("Received " + i + " values for key " + key);
}
}
减少数据倾斜的方法
方法1:抽样和范围分区
可以通过对原始数据进行抽样的到的结果集来预设分区边界值
方法2:自定义分区
基于输出键的背景知识进行自定义分区。例如,如果map输出键的单词源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这些专业词汇发送给固定的一部分reduce示例。而将其他的都发送给剩余的reduce实例
方法3:Combine
使用Combine可以大量低减少数据倾斜。在可能的情况下,combine的目的就是聚合并精简数据。
方法4:采用MapJoin,尽量避免ReduceJoin
资源相关参数
配置参数 | 参数说明 |
---|---|
mapreduce.map.memory.mb | 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.reduce.memory.mb | 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.map.cpu.vcores | 每个Map task可使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.cpu.vcores | 每个Reduce task可使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.shuffle.parallelcopies | 每个reduce去map中拿数据的并行数。默认值是5 |
mapreduce.reduce.shuffle.merge.percent | buffer中的数据达到多少比例开始写入磁盘。默认值0.66 |
mapreduce.reduce.shuffle.input.buffer.percent | buffer大小占reduce可用内存的比例。默认值0.7 |
mapreduce.reduce.input.buffer.percent | 指定多少比例的内存用来存放buffer中的数据,默认值是0.0 |
配置参数 | 参数说明 |
---|---|
yarn.scheduler.minimum-allocation-mb 1024 | 给应用程序container分配的最小内存 |
yarn.scheduler.maximum-allocation-mb 8192 | 给应用程序container分配的最大内存 |
yarn.scheduler.minimum-allocation-vcores 1 | 每个container申请的最小CPU核数 |
yarn.scheduler.maximum-allocation-vcores 32 | 每个container申请的最大CPU核数 |
yarn.nodemanager.resource.memory-mb 8192 | 给containers分配的最大物理内存 |
配置参数 | 参数说明 |
---|---|
mapreduce.task.io.sort.mb 100 | shuffle的环形缓冲区大小,默认100m |
mapreduce.map.sort.spill.percent 0.8 | 环形缓冲区溢出的阈值,默认80% |
容错相关参数(mapreduce性能优化)
配置参数 | 参数说明 |
---|---|
mapreduce.map.maxattempts | 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.reduce.maxattempts | 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.task.timeout | Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。 |
HDFS上每个文件都要在namenode上建立一个索引,这个索引的大小约为15byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用namenode的内存空间,另一方面就是索引文件过大导致索引速度变慢
Hadoop Archive:
是一个高效地将小文件放入HDFS块中的文件存档工具,他能够将多个小文件打包成一个HAR文件,这样就减少了namenode的内存使用
Sequence File
Sequence File 由一系列的二进制key/value组成,如果key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件
CombineFileInputFormat
CombineFileInputFormat是一种新的inputFormat,用于将多个文件合并成一个单独的split,另外,他会考虑数据的存储位置
开启JVM重用
对于大量小文件Job,可以开启JVM重用,会减少45%的运行时间
JVM重用理解:一个map运行一个jvm,重用的话,一个map在JVM上运行完毕之后,JVM继续运行其他map
具体设置:mapreduce.job.jvm.numtasks值在10-20之间。
内容来源于网络,如有侵权,请联系作者删除!