首先,我们来看看Hadoop的计算框架特性,在此特性下会衍生哪些问题?
面对这些问题,我们能有哪些有效的优化手段呢?下面列出一些在工作有效可行的优化手段:
而接下来,我们心中应该会有一些疑问,影响性能的根源是什么?
我们知道了性能低下的根源,同样,我们也可以从Hive的配置解读去优化。Hive系统内部已针对不同的查询预设定了优化方法,用户可以通过调整配置进行控制, 以下举例介绍部分优化的策略以及优化控制选项。
Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其它列。 例如,若有以下查询:
SELECT a,b FROM q WHERE e<10;
可以在查询的过程中减少不必要的分区。 例如,若有以下查询:
SELECT * FROM (SELECTT a1,COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100;
-- 多余分区
SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;
在编写带有 join 操作的代码语句时,应该将条目少的表/子查询放在 Join 操作符的左边。 因为在 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,载入条目较少的表 可以有效减少 OOM(out of memory)即内存溢出。所以对于同一个 key 来说,对应的 value 值小的放前,大的放后,这便是“小表放前”原则。 若一条语句中有多个 Join,依据 Join 的条件相同与否,有不同的处理方法。
3.1 JOIN原则
在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。对于一条语句中有多个 Join 的情况,如果 Join 的条件相同,比如查询:
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
如果 Join 的条件不相同,比如:
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
Map-Reduce 的任务数目和 Join 操作的数目是对应的,上述查询和以下查询是等价的:
INSERT OVERWRITE TABLE tmptable
SELECT * FROM page_view p JOIN user u
ON (pv.userid = u.userid);
INSERT OVERWRITE TABLE pv_users
SELECT x.pageid, x.age FROM tmptable x
JOIN newuser y ON (x.age = y.age);
Join 操作在 Map 阶段完成,不再需要Reduce,前提条件是需要的数据在 Map 的过程中可以访问到。比如查询:
INSERT OVERWRITE TABLE pv_users
SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
FROM page_view pv
JOIN user u ON (pv.userid = u.userid);
hive.join.emit.interval = 1000
hive.mapjoin.size.key = 10000
hive.mapjoin.cache.numrows = 10000
进行GROUP BY操作时需要注意一下几点:
5.1 Map端部分聚合
事实上并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。
# 用于设定是否在 map 端进行聚合,默认值为真
hive.map.aggr=true
# 用于设定 map 端进行聚合操作的条目数
hive.groupby.mapaggr.checkinterval=100000
5.2 有数据倾斜时进行负载均衡
此处需要设定 hive.groupby.skewindata,当选项设定为 true 是,生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。
我们知道文件数目小,容易在文件存储端造成瓶颈,给 HDFS 带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响。
# 是否合并Map输出文件,默认值为真
hive.merge.mapfiles=true
# 是否合并Reduce 端输出文件,默认值为假
hive.merge.mapredfiles=false
# 合并文件的大小,默认值为 256000000
hive.merge.size.per.task=256*1000*1000
熟练地使用 SQL,能写出高效率的查询语句。
场景:有一张 user 表,为卖家每天收到表,user_id,ds(日期)为 key,属性有主营类目,指标有交易金额,交易笔数。每天要取前10天的总收入,总笔数,和最近一天的主营类目。
1.1 解决方法一
如下所示:常用方法
INSERT OVERWRITE TABLE t1
SELECT user_id,substr(MAX(CONCAT(ds,cat),9) AS main_cat) FROM users
-- 20120329为日期列的值,实际代码中可以用函数表示出当天日期
WHERE ds=20120329
GROUP BY user_id;
INSERT OVERWRITE TABLE t2
SELECT user_id,sum(qty) AS qty,SUM(amt) AS amt FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id
SELECT t1.user_id,t1.main_cat,t2.qty,t2.amt FROM t1
JOIN t2 ON t1.user_id=t2.user_id
下面给出方法一的思路,实现步骤如下:
第一步:利用分析函数,取每个 user_id 最近一天的主营类目,存入临时表 t1。
第二步:汇总 10 天的总交易金额,交易笔数,存入临时表 t2。
第三步:关联 t1,t2,得到最终的结果。
1.2 解决方法二
如下所示:优化方法
SELECT user_id,substr(MAX(CONCAT(ds,cat)),9) AS main_cat,SUM(qty),SUM(amt) FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id
问题:日志中常会出现信息丢失,比如每日约为 20 亿的全网日志,其中的 user_id 为主 键,在日志收集过程中会丢失,出现主键为 null 的情况,如果取其中的 user_id 和 bmw_users 关联,就会碰到数据倾斜的问题。原因是 Hive 中,主键为 null 值的项会被当做相同的 Key 而分配进同一个计算 Map。
2.1 解决方法一
如下所示:user_id 为空的不参与关联,子查询过滤 null
SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
UNION All SELECT * FROM log a WHERE a.user_id IS NULL;
**2.2 解决方法二 **
如下所示:函数过滤 null
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id END =b.user_id;
一张表 s8 的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。 s8 的日志中有 32 为字符串商品 id,也有数值商品 id,日志中类型是 string 的,但商品中的 数值 id 是 bigint 的。猜想问题的原因是把 s8 的商品 id 转成数值 id 做 hash 来分配 Reduce, 所以字符串 id 的 s8 日志,都到一个 Reduce 上了,解决的方法验证了这个猜测。
3.1 解决方法
把数据类型转换成字符串类型
SELECT * FROM s8_log a LEFT OUTERJOIN r_auction_auctions b ON a.auction_id=CASE(b.auction_id AS STRING);
问题:比如推广效果表要和商品表关联,效果表中的 auction_id 列既有 32 为字符串商 品 id,也有数字 id,和商品表关联得到商品的信息。
4.1 解决方法
Hive SQL 性能会比较好
SELECT * FROM effect a
JOIN
(SELECT auction_id AS auction_id FROM auctions
UNION All
SELECT auction_string_id AS auction_id FROM auctions) b
ON a.auction_id=b.auction_id;
Hive 对 union all 的优化的特性:对 union all 优化只局限于非嵌套查询。
5.1 消灭子查询内的 group by
SELECT * FROM
(SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3)t3
GROUP BY c1,c2,c3;
从业务逻辑上说,子查询内的 GROUP BY 怎么都看显得多余(功能上的多余,除非有 COUNT(DISTINCT)),如果不是因为 Hive Bug 或者性能上的考量(曾经出现如果不执行子查询 GROUP BY,数据得不到正确的结果的 Hive Bug)。所以这个 Hive 按经验转换成如下所示:
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2)t3 GROUP BY c1,c2,c3;
5.2 消灭子查询内的 COUNT(DISTINCT),MAX,MIN
SELECT * FROM
(SELECT * FROM t1
UNION ALL SELECT c1,c2,c3 COUNT(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3
GROUP BY c1,c2,c3;
由于子查询里头有 COUNT(DISTINCT)操作,直接去 GROUP BY 将达不到业务目标。这时采用 临时表消灭 COUNT(DISTINCT)作业不但能解决倾斜问题,还能有效减少 jobs。
INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3;
SELECT c1,c2,c3,SUM(income),SUM(uv) FROM
(SELECT c1,c2,c3,income,0 AS uv FROM t1
UNION ALL
SELECT c1,c2,c3,0 AS income,1 AS uv FROM t2) t3
GROUP BY c1,c2,c3;
5.3 消灭子查询内的 JOIN
SELECT * FROM
(SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x
GROUP BY c1,c2;
上面代码运行会有 5 个 jobs。加入先 JOIN 生存临时表的话 t5,然后 UNION ALL,会变成 2 个 jobs。
INSERT OVERWRITE TABLE t5
SELECT * FROM t2 JOIN t3 ON t2.id=t3.id;
SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);
计算 uv 的时候,经常会用到 COUNT(DISTINCT),但在数据比较倾斜的时候 COUNT(DISTINCT) 会比较慢。这时可以尝试用 GROUP BY 改写代码计算 uv。
6.1 原有代码
INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329)
SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv
FROM s_ods_log_tanx_pv t
WHERE t.ds=20120329
GROUP BY adzoneid;
6.2 测试用例
/*
* 关于COUNT(DISTINCT)的数据倾斜问题不能一概而论,要依情况而定,下面是我测试的一组数据:
* 测试数据:169857条
*/
-- 统计每日IP,耗时:24.805 seconds
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS IP
FROM logdfs WHERE logdate=’2014_12_29′;
-- 统计每日IP(改造),耗时:46.833 seconds
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip
from logdfs WHERE logdate=’2014_12_29′) tmp;
优化时,把hive sql当做mapreduce程序来读,会有意想不到的惊喜。理解hadoop的核心能力,是hive优化的根本。这是这一年来,项目组所有成员宝贵的经验总结。
优化时把握整体,单个作业最优不如整体最优。
(2)这个参数控制最大的reducer的数量,如果 input / bytes per reduce > max ,则会启动这个参数所指定的reduce个数。这个并不会影响mapre.reduce.tasks参数的设置,默认的max是999。
hive.exec.reducers.max
(3)这个参数如果指定了,hive就不会用它的estimation函数来自动计算reduce的个数,而是用这个参数来启动reducer,默认是-1。
mapred.reduce.tasks
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://volitation.blog.csdn.net/article/details/83820860
内容来源于网络,如有侵权,请联系作者删除!