一、Hive简介
1.1 hive是什么
(1) Hive是一个SQL解析引擎,将SQL语句转化成MR Job。
(2) Hive中的表是纯逻辑表,就只是表的定义等,即表的元数据。本质就是hadoop的目录文件,达到元数据与数据存储分离的目的。
(3) hive本身不存储数据,完全依赖于hdfs和mapreduce。
(4) hive的内容是读多写少,不支持对数据的改写和删除。
(5) hive中没有定义专门的数据格式,由用户指定,需要指定三个属性:
· 列分隔符 空格,',','\t'
· 行分隔符 '\n' (etl处理时,防止数据串行)
· 读取文件数据的方法
1.2 Hive VS 传统数据库
Hive 和传统数据库除了拥有类似的查询语言,再无类似之处。
1)数据存储位置
Hive 存储在 HDFS ,默认的计算框架是MapReduce。数据库将数据保存在块设备或者本地文件系统中,是自己设计的计算模型。
2)数据更新
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
3)执行延迟
Hive 为海量数据做数据挖掘设计的,执行延迟较高。关系型数据库为实时查询的业务设计的,执行延迟低。当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
4)数据规模
Hive很容易扩展自己的存储能力和计算能力,支持大规模的数据计算;数据库可以支持的数据规模较小。
1.3 hive的架构
提交Hql的流程(如何将Hive与hadoop联系起来):
Hive端:
用户提交executeQuery给Driver端,Driver端生成一个plan给Compiler,Compiler进行编译之后,向MetaStore获取元数据信息,MetaStore将元数据信息(sql以及对应字段)返回给compiler,compiler将plan(解析出来的语法树)反馈给driver端,driver端以任务的形式,将executePlan提交给ExecutionEngine。
Hadoop端:
JobTracker收到ExecutionEngine提交的executionJob后,namenode和JobTracker将任务分发给TaskTracker,执行Map和Reduce操作,终结果反馈给ExecutionEngine,将其结果展示给用户。
二、Hive实操
2.1创建表
2.1.1内部表
数据来源:
链接:https://pan.baidu.com/s/1P1LzgCLppVTl9T-R1zf8Ow
提取码:hwln
内部表创建:
create table article(sentence string) row format delimited fields terminated by ‘\n’;
表描述:
describe article;
将虚拟机本地的文件载入article表中:
load data local inpath ‘/home/badou/data/hive/The_man_of_property.txt’ into table article;
数据由hive管理,文件路径是hive-site.xml配置warehouse的对应badou这个库的路径$HIVE_HOME/warehouse/badou.db/article
Hive的warehouse里面都是内部表。
2.1.2外部表
当数据已经放在hdfs上面,不移动数据到hive的warehouse文件夹下面,通过外部表的方式指定hdfs上面的路径,利用hql也可以访问数据。
外部表创建时EXTERNAL修饰,代码如下:
create external table art_ext(sentence string comment ‘article’)
comment ‘article’
row format delimited fields terminated by ‘\n’
stored as textfile
location ‘/data’; --hdfs文件夹路径,不需要导入对应具体数据
内部表和外部表的删除测试:
article表是内部表,数据存储在hive的warehouse对应库文件夹下面;
srt_ext表是外部表,数据是指向hdfs路径下面的;
drop table article之后,warehouse对应库下面的数据文件被删除了;
drop table art_ext之后,删除的是表的元数据,而不会删除掉hdfs上面的原始数据,方便数据恢复。
2.1.3 分区表
表名/子文件夹,分区之后查询效率更快。一般会用日期、客户端分区,其他分区字段可以根据业务需要增加。
查看数据的时候打印表头:
set hive.cli.print.header = true;
- 创建分区表
create table art_dt(sentence string comment ‘文章’)
partition by (dt string)
row format delimited fields terminated by ‘\n’;
2.插入数据
insert overwrite table art_dt partition(dt=’20191206’)
select * from article limit 100;
查看分区数:
show partitions art_dt;
在Hive中,表中的一个Partiton对应于表下的一个目录,所有的Partition的数据都存储在对应的目录中
--例如:pvsI表中包含dt和city两个partition,则:
对应于dt=20191206,city=US的HDFS目录为:/warehouse/pvs/ds=20191206/city=US
对应于dt=20191207,city=CA的HDFS目录为:/warehouse/pvs/ds=20191207/city=CA
partition是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件统一管理。
- 业务场景
Hive任务一般是凌晨定时任务,比如一点执行这个sql,跑昨天一天的数据,写入对应昨天的文件夹中。离线表T+1。
ETL或统计分析的处理逻辑,写好sql,直接shell脚本里面调用执行就可以了。
2.1.4分桶表
Bucket主要作用:
- 数据sampling;
- 提升查询操作效率。可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了分桶操作。那么将保存相同列值的桶直接进行JOIN操作就可以,可以大大较少JOIN的数据量。
Bucket表需要查询当前已经在hive中的表或者partition可以进行分桶。
- 建立辅助表,bucket_num里面是1-32共32个数字。
create table bucket_num(num int)
row format delimited fields terminated by ‘\n’;
- 加载数据
load data local inpath ‘/home/badou/data/hive/bucket_num.txt’ into table bucket_num;
- 建立分区表
- 配置分区表属性:
set hive.enforce.bucketing=true; (此步须先执行)
(2) 创建分区表:
create table bucket_table(num int)
clustered by(num) into 8 buckets
row format delimited fields terminated by `\n`;
(3) 从bucket_num中导入数据:
insert overwrite table bucket_table select num from bucket_num;
上述导入数据进行读写,因为分桶8个,所以会有8个reducer。
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 8
(4) 查看表中数据:
hadoop fs -ls /usr/local/src/hive/hive-1.2.2/warehouse/badou.db/bucket_table
会发现有8个文件:000000_0、000001_0到000007_0(序号从0开始)。
(5) 采样sample
select * from bucket_table tablesample(bucket 1 out of 4 on num);
共采样2个桶。具体采样第1个桶和第5个桶;输出值:(32/24/16/8)、(12/4/28/20)。
select * from bucket_table tablesample(bucket 2 out of 8 on num);
共采样1个桶。具体采样第2个桶;输出值:(9/25/1/17)。
select * from bucket_table tablesample(bucket 1 out of 16 on num);
共采样0.5个桶。具体采样第1个桶的50%数据,输出值:(32/16)。
select * from bucket_table tablesample(bucket 3 out of 32 on num);
共采样0.25个桶。具体采样第3个桶的25%数据,输出值:(32)。
采样说明:
- 抽样语句:tablesample(bucket X out of Y on COLS)
- COLS表示分桶的字段。
3)Y必须是table总bucket数的倍数或因子。Hive根据Y的大小,决定抽样的比例(抽几个桶的数据)。
4)X表示从哪个bucket开始抽取。
假设table共有8个bucket:根据哈希取模的方式将1-32这32个数据放进对应桶编号的文件中。
比如:000000_0编号的桶:哈希取模从0开始,故0编号的桶中的数据是32/24/16/8。依次类推其他桶。
注意第1个桶编号为0。
不同桶中的数据因为shuffle过程,没有进行排序,所以每个桶中的数据是无序。
tablesample(bucket 1 out of 4 on num)
抽取(8/4=)2个bucket的数据,从第1个bucket开始抽取,即抽取第1个和第5个bucket。
tablesample(bucket 3 out of 8 on num)
抽取(8/8=)1个bucket的数据,从第3个bucket开始抽取,即抽取第3个bucket。
tablesample(bucket 1 out of 16 on num)
抽取(8/16=)0.5个bucket的数据,从第1个bucket中抽取桶中一半的数据。这里桶中一半的数据进行抽取时,默认又进行哈希,从个开始抽取。原本第1个桶(0编号的桶)中的数据为32/24/16/8,取第1个桶一半的数据时,取的是32/16,默认又进行了哈希。
2.1.5文件存储
(1) TextFile
HDFS行存储,纯文本。
(2) SequenceFile
HDFS行存储,二进制文件,以Hadoop 的标准的Writable 接口实现序列化和反序列化。
(3) RCFile
HDFS RCFile存储。对于在宽表中读取特别的几列,效率比较高。记录水平拆分,再垂直拆分。同一个数据块中,相同的列的数据依次存放。而同一条记录的数据只会保存在同一个数据块中。
(4) Parquet
---> 可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量
---> 压缩编码可以降低磁盘存储空间(由于同一列的数据类型是一样的,可以使用更高效的压缩编码(如 Run Length Encoding t Delta Encoding)进一步节约存储空间)
---> 只读取需要的列,支持向量运算,能够获取更好的扫描性能
---> Parquet 格式是 Spark SQL 的默认数据源,可通过 spark.sql.sources.default 配置。
☆数仓中使用文件存储格式?
企业里一般使用ORC或者Parquet,因为是列式存储,且压缩比非常高,所以相比于textFile,查询速度快,占用硬盘空间少。
2.2查询
数据源:
链接:https://pan.baidu.com/s/1SAiZONbey6qKidvJd8AyvQ
提取码:hcov
两张表:orders(订单表)、order_products_prior(订单商品表)
2.2.1简单查询
Join类型:
(1) inner join
语法与传统关系型数据库sql大致一致,但略有不同。标准sql的表的加载顺序是从右向左。但在hive中是从左向右加载的,所以在hive中进行sql优化时,可以把表按数据所占空间的大小,从小到大进行关联,小的表放在左边。
hive中关联支持等值连接,不支持不等值连。hive中关联逻辑运算,只支持and,不支持or。
(2) LEFT OUTER JOIN 左外连接
(3) RIGHT OUTER JOIN 右外连接
(4) FULL OUTER JOIN 全连接
(5) LEFT SEMI JOIN
(6) 笛卡尔积JOIN
查询练习:
问题(1):将orders和order_products_prior建表入hive。
create table orders(
order_id string,
user_id string,
eval_set string,
order_number string,
order_dow string,
order_hour_of_day string,
days_since_prior_order string
)
row format delimited fields terminated by ',';
load data local inpath '/home/badou/Documents/data/order_data/orders.csv' into table orders;
问题(2):每个用户有多少个订单?
select user_id,count(distinct order_id) as order_num
from orders
group by user_id;
问题(3):每个用户的每个订单有多少商品?
select a.user_id,a.order_id,b.prod_cnt
from orders a
inner join (
select order_id,count(1) as prod_cnt
from order_products__prior
group by order_id
) b on a.order_id=b.order_id
group by a.user_id;
问题(4):每个用户平均每个订单有多少商品?
select a.user_id,sum(b.prod_cnt)/count(distinct a.order_id) as avg_prod
from orders a
inner join (
select order_id,count(1) as prod_cnt
from order_products__prior
group by order_id
) b on a.order_id=b.order_id
group by a.user_id;
问题(5):每个用户在一周中的购买订单的分布?
set hive.cli.print.header=true; --打印表头
select user_id,
count(case when order_dow=0 then order_id else null end) as dow_0,
count(case when order_dow=1 then order_id else null end) as dow_1,
count(case when order_dow=2 then order_id else null end) as dow_2,
count(case when order_dow=3 then order_id else null end) as dow_3,
count(case when order_dow=4 then order_id else null end) as dow_4,
count(case when order_dow=5 then order_id else null end) as dow_5,
count(case when order_dow=6 then order_id else null end) as dow_6
from badou.orders
group by user_id;
问题(6):每个用户喜爱购买的三个product是什么,终表结构可以是3个列,或者是一个字符串
形如[user_id 'prod_,prod_top2,prod_top3' ],只有1个商品时:[user_id 'prod_']
select f.user_id,collect_list(concat_ws('_',f.product_id,cast(f.part_rank as string))) as top3_prod
from (select m.*,
row_number() over(partition by m.user_id order by m.prod_num desc) as part_rank
-- ,row_number() over(distribute by m.user_id sort by m.prod_num desc) as sort_rank
from (select a.user_id,b.product_id,count(b.product_id) as prod_num
from orders a
inner join order_products_prior b on a.order_id=b.order_id
group by a.user_id,b.product_id
) m
order by user_id,part_rank
) f
where f.part_rank<=3
group by f.user_id
limit 10;
2.2.2 hive中正则表达式
regexp_extract(str, regexp,idx)
参数解释:
str:被解析的字符串或字段名
regexp :正则表达式
idx:返回结果取表达式的哪一部分,默认值为1。
0表示把整个正则表达式对应的结果全部返回
1表示返回正则表达式中个() 对应的结果 以此类推
2.2.3日期处理函数
1)date_format函数(根据格式整理日期)
2)date_add、date_sub函数(加减日期)
3)next_day函数
4)last_day函数(求当月后一天日期)
5)collect_set函数
6)get_json_object解析json函数
2.2.4窗口函数
(1) 排序函数:
rank() 排序相同时会重复,产生跳跃,总数不变。
dense_rank() 排序相同时会重复,不产生跳跃,总数减少。
row_number() 排序相同时不重复,不产生跳跃,总数不变。
先从orders表中提取100条数据插入到rank_test表中:
create table rank_test as select user_id, order_dow from orders limit 100;
select user_id, order_dow from rank_test limit 10;
下面排序测试:
-- rank测试
select user_id,
order_dow,
rank() over(partition by user_id order by cast(order_dow as int) desc) as rk
from rank_test;
-- row_number测试
select user_id,
order_dow,
row_number () over(partition by user_id order by cast(order_dow as int) desc) as rk
from rank_test;
--dense_rank测试
select user_id,
order_dow,
dense_rank() over(partition by user_id order by cast(order_dow as int) desc) as rk
from rank_test;
窗口函数:
1)over():指定分析函数工作的数据窗口大小,数据窗口大小可能会随着行的变而变化
2)current row:当前行
3)n preceding:往前n行数据
4)n following: 往后n行数据
5)unbounded: 起点,
unbounded preceding 表示从前面的起点,
unbounded following 表示到后面的终点
6)lag(col,n):往前第n行数据
7)lead(col,n):往后第n行数据
8)ntile(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
(2) by的4种用法
1)sort by: 分区内有序;
2)order by: 全局排序,只有一个Reducer;
3)distribute by:类似MR中Partition,进行分区,结合sort by使用。
4)cluster by:当distribute by和sorts by字段相同时,可以使用cluster by方式。cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
2.2.5 UDF&UDTF&UDAF
UDF: 用户自定义普通函数,1对1关系,常用于select语句;
UDAF:用户自定义聚合函数,多对1关系,常用语group by语句;
UDTF: 用户自定义表生成函数,1对多关系, 分词: 输入一句话输出多个单词。
用途:自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。
Java实现UDF,实现一个大写函数,脚本是java代码:(需要实现evaluate函数,evaluate函数支持重载。)
package badou.hive.udf
import org. apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.io.Text
public class Uppercase extends UDF{
public Text evaluate(final Text s){
return new Text(s.toString().toUpperCase())
}
}
使用HIVE命令用PYTHON实现UDF:
- 本地测试python代码跑通(pycharm):
# _*_ coding:utf-8 _*_
import re
# 提取数字和字母
p = re.compile(r'\w+')
data_path = "../Data/The_man_of_property.txt"
with open(data_path, "r", encoding='utf-8') as f:
for line in f.readlines():
word_lst = line.strip().split(" ") # 数组列表
# print(word_lst)
for word in word_lst:
re_word = p.findall(word)
if len(word) < 1:
continue
word = re_word[0].lower()
print('%s, %s' % (word, "1"))
- 集群环境hdfs测试:
因为是将hdfs中对应文件输出,逐行读进python,所以是sys.stdin输入。放在集群上面的py代码稍作调整:
import sys
import re
p = re.compile(r'\w+')
for line in sys.stdin:
word_list = line.strip().split(' ')
for word in word_list:
low_word = p.findall(word)
if len(low_word) < 1:
continue
s_low = low_word[0].lower()
print(s_low+'\t'+'1')
集群测试命令:
hadoop fs -cat /usr/local/src/hive/hive1.2.2/warehouse/badou.db/article/The_man_of_property.txt|python python_udf_test.py
- hive中建表后article表:(hive中操作)
- 添加本地py文件,也可以是hdfs文件路径
add file /home/badou/python/python_udf_test.py;
add file hdfs://host:9000/python/python_udf_test.py; (host换成自己的ip)
2) 通过transform语句,可以将hive语句中select得到的数据通过类似hadoop中streaming的方式,传给using语句中的脚本作为输入。用DISTRIBUTE BY将mapper的输出分类给reducer。要注意的是,这里的TRANSFORM的内容可以写*,但是AS( )里就不能写*,会报错。
select transform(sentence) using 'python python_udf_test.py' as (a) from article;
2.2.6 文件上传与导出
数据导入有三种方式:hdfs导入、本地导入和直接查询导入。
从hdfs上导入数据:
load data inpath '/home/hive/input/a.txt' overwrite into table tmp;
从本地导入数据:
load data local inpath '/local/hive/input/a.txt' overwrite into table tmp;
导入语句的overwrite关键字:表示加载的数据会覆盖原来的数据。
直接查询导入数据:
insert into table table1 select user_id, age from user_info limit 10;
直接查询导入也支持动态分区插入:
insert into table table1 partition(dt='$date') select * from user_info limit 10;
数据导出为本地文件:
insert overwrite local directory '/home/hive/output' select * from table;
数据导出为hdfs文件:
insert overwrite directory '/hive/output' select * from table;
三、Hive优化
1)采用分区技术
分区裁剪(partition):where 中的分区条件,会提前生效,不必特意做子查询,直接join和group by。
hive.optimize.cp=true:列裁剪
hive.optimize.prunner:分区裁剪
2) strict mode严格模式
Hive.mapred.mode=true,严格模式下不允许执行以下查询:
分区表上没有指定分区,没有limit限制的order by语句;笛卡尔积时join没有on条件。
3) 行列过滤
列处理:在SELECT中,只提取需要的列,如果有,尽量使用分区过滤,少用SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。
4)大小表
强制指定哪个是大表: /* +STREAMTALBE(table_name)*/
select /* +STREAMTALBE(a)*/ a.* from table_name1 a inner join table_name2 b on a.c1 = b.c1
强制指定哪个是小表放入内存:/* +MAPJOIN(table_name)*/
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
-- /*+MAPJOIN(tablelist)*/ 测试
python中测试:
dict = {'order_id':'product_id'}
for line in sys.stdin:
s = line.split(',')
user_id = s[1]
order_id = s[0]
product_id = dict.get(order_id)
print('%s%s%s'%(user_id,order_id,product_id))
hive中测试,指定t1为小表(小表好不超过1G或50万条记录),载入内存:
create table small_table_test as select * from orders where user_id='1';
select /*+MAPJOIN(t1)*/t1.user_id,t2.order_id,t2.product_id
from small_table_test t1
join order_products_prior t2 on t1.order_id=t2.order_id;
自动进行一个mapjoin的判断:
Starting to launch local task to process map join maximum memory = 518979584
Job显示:
Hadoop job information for Stage-3: number of mappers: 3; number of reducers: 0
hive会默认把左边的数据放到内存,右边的数据类似流数据进行遍历。
5) 先union all再join/group by操作,尽管有多个select,但是只有一个reducer。
union all过程没有reduce过程。
6)采用分桶技术,采样测试数据。
7)Map端优化
-- 在map中做部分聚合操作,效率更高但需要更多内存空间
set hive.map.aggr=true;
-- 在Map端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval
控制map个数:
1. 通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);
a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数;
b) 假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数
即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。
2. 是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128M),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。
而且,同时可执行的map数是受限的。
3. 是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对问题2和问题3,我们需要采取两种方式来解决:即减少map数和增加map数。
如何合并小文件,减少map数?
假设一个SQL任务:
select count(1) from popt where pt ='2012-07-04';
该任务的inputdir /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04
共有194个文件,其中很多是远远小于128m的小文件,总大小9G,正常执行会用194个map任务。Map总共消耗的计算资源: SLOTS_MILLIS_MAPS= 623,020
通过以下方法来在map执行前合并小文件,减少map数:
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
再执行上面的语句,用了74个map任务,map消耗的计算资源:SLOTS_MILLIS_MAPS= 333,500
对于这个简单SQL任务,执行时间上可能差不多,但节省了一半的计算资源。
大概解释一下,100000000表示100M,
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这个参数表示执行前进行小文件合并,前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并,终生成了74个块。
如何适当的增加map数?
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
假设有这样一个任务:
select data_desc,
count(1),
count(distinct id),
sum(case when …),
sum(case when …),
sum(…)
from a group by data_desc;
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。
set mapred.reduce.tasks=10;
create table a_1 as
select * from a
distribute by rand(123);
这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。
看上去,貌似这两种有些矛盾,一个是要合并小文件,一个是要把大文件拆成小文件,这点正是重点需要关注的地方,根据实际情况,控制map数量需要遵循两个原则:
使大数据量利用合适的map数;使单个map任务处理合适的数据量。
8)Reduce端优化
合理设置Reduce数,Reduce个数并不是越多越好。
(1)过多的启动和初始化Reduce也会消耗时间和资源;
(2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么这些小文件作为下一个任务的输入,也会出现小文件过多的问题;
在设置Reduce个数的时候需要考虑这两个原则:
处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适。
1. Hive自己如何确定reduce数:
reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
hive.exec.reducers.bytes.per.reducer
(每个reduce任务处理的数据量,默认为1000^3=1G)
hive.exec.reducers.max
(每个任务大的reduce数,默认为999)
计算reducer数的公式很简单
N=min(hive.exec.reducers.max,
总输入数据量/ hive.exec.reducers.bytes.per.reducer)
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
如:select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04′ group by pt;
/group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04 总大小为9G多,因此这句有10个reduce
- 调整reduce个数
方法一:
调整hive.exec.reducers.bytes.per.reducer参数的值;
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
select pt,count(1) from popt_tbaccountcopy_mes where pt =‘2012-07-04′ group by pt;
这次有20个reducer。
方法二:
set mapred.reduce.tasks = 15;
select pt,count(1) from popt_tbaccountcopy_mes where pt =‘2012-07-04′ group by pt;
这次有15个reducer。
- 什么情况下只有一个reduce?
很多时候你会发现任务中不管数据量多大,不管你有没有设置调整reduce个数的参数,任务中一直都只有一个reduce任务;
其实只有一个reduce任务的情况,除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外,还有以下原因:
a) 没有group by的汇总,
比如把
select pt,count(1) from popt_tbaccountcopy_mes where pt =‘2012-07-04′ group by pt;
写成
select count(1) from popt_tbaccountcopy_mes where pt =‘2012-07-04′;
这点非常常见,希望大家尽量改写。
b) 用了Order by
c) 有笛卡尔积
通常这些情况下,除了找办法来变通和避免,我暂时没有什么好的办法,因为这些操作都是全局的,所以hadoop不得不用一个reduce去完成。
备注:
map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,正常的map数量的并行规模大致是每一个Node是 10~100个,对于CPU消耗较小的作业可以设置Map数量为300个左右,但是由于hadoop的每一个任务在初始化时需要一定的时间,因此比较合理的情况是每个map执行的时间至少超过1分钟。具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理,当然用户还是可以通过参数mapred.min.split.size参数在作业提交客户端进行自定义设置。还有一个重要参数就是mapred.map.tasks,这个参数设置的map数量仅仅是一个提示,只有当InputFormat 决定了map任务的个数比mapred.map.tasks值小时才起作用。同样,Map任务的个数也能通过使用JobConf 的conf.setNumMapTasks(int num)方法来手动地设置。这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。当然为了提高 集群的并发效率,可以设置一个默认的map数量,当用户的map数量较小或者比本身自动分割的值还小时可以使用一个相对交大的默认值,从而提高整体 hadoop集群的效率。
9) 并行执行
set hive.exec.parallel=true;
10)数据倾斜
表现:任务进度长时间维持在99%(或),查看任务监控页面时,发现只有少量(或1个)reduce子任务未完成。因为其处理的数据量和其他reduce差异较大。单一reduce的记录数与平均纪录数差异过大,通常可能达到3倍甚至更多,大时长远大于平均时长。
原因:key分布不均导致的; 人为建表疏忽; 业务数据本身的特性;
sql语句本身就有数据倾斜:
set hive.groupby.skewindata=true;
这个参数必须记住哇,面试必考。参数配置成功后,系统会自动做负载均衡。
如何定位数据倾斜问题:哪几个key发生了数据倾斜?(手动操作)
降低操作的数据量,通过bucket采样定位key的问题,对key引入随机数,分发到多个reduce,减轻同一个reduce的压力。
11) 使用索引
hive.optimize.index.filter 自动使用索引
hive.optimize.index.groupby 使用聚合索引优化GROUP BY操作
四、常用参数
-- 设置动态分区为nonstrict模式
set hive.exec.dynamic.partition.mode=nonstrict;
-- 每个Map大输入大小
set mapred.max.split.size= 300000000;
-- 每个Map小输入大小
set mapred.min.split.size= 100000000;
-- 执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
--合并map输出文件
set hive.merge.mapfiles=true;
-- 在Map-Reduce的任务结束时合并输出文件
set hive.merge.mapredfiles=true;
-- 合并文件的大小
set hive.merge.size.per.task=300000000;
-- 打开子查询并发,是否开启 map/reduce job的并发提交。默认值为:false。
set hive.exec.parallel=true;
-- 同一个sql允许大的并行度
set hive.exec.parallel.thread.number=8;
-- 是否自动转换为mapjoin
set hive.auto.convert.join=false;
--小表的大文件大小,默认为25000000,即25M
set hive.mapjoin.smalltable.filesize = 25000000;
--是否将多个mapjoin合并为一个
set hive.auto.convert.join.noconditionaltask = true;
--多个mapjoin转换为1个时,所有小表的文件大小总和的大值。
set hive.auto.convert.join.noconditionaltask.size = 10000000;
-- hive输入格式设置,如果值为org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
-- 节点中每个map可以处理的小的文件大小
set mapred.min.split.size.per.node=300000000;
-- 机架中每个map可以处理的小的文件大小
set mapred.min.split.size.per.rack=300000000;
-- 每个reducer处理文件大小
set hive.exec.reducers.bytes.per.reducer=500000000;
-- 设置计算引擎
set hive.execution.engine=tez;
-- 决定group by操作是否支持数据倾斜处理
set hive.groupby.skewindata=true;
-- 输出时打印表头
set hive.cli.print.header = true;
☆ Hive的正则表达式和窗口函数内容比较多,有时间单列出来整理。
由于个人水平有限,若有问题,欢迎指正,谢谢!