Hive优化——Hive SQL优化


发布于 2017-04-01 / 102 阅读 / 0 评论 /
针对Hive SQL进行优化。

1.Join优化

Hive中有多种join算法,

1.1.Common Join

Hive中最稳定的join算法,通过一个MapReduce Job完成一个join操作。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。例如下图所示:

需要注意的是,sql语句中的join和执行计划中的Common Join任务并非一对一的关系,一个sql语句中的相邻的且关联字段相同的多个join操作可以合并为一个Common Join任务。

例如,以下sql,可以由一个Common Join任务(也就是一个MapReduce任务)实现:

Select a.val,b.val,c.val from a 
join b on a.key = b.key1
join c on c.key = b.key1

但是,针对以下sql,需要两个Common Join任务:

Select a.val,b.val,c.val from a 
join b on a.key = b.key1
join c on c.key = b.key2

1.2.Map Join

通过两个只有map阶段的Job完成一个join操作。其适合场景为“大表join小表”。若某join操作满足要求,则第一个Job会读取小表数据,将其制作为hash table,并上传至HDFS。第二个Job会先从HDFS中读取小表数据,并换存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。如下图所示:

图中没有Reduce阶段的Job,也就少了shuffle过程,性能会有大提升。

Map Join有两种触发方式:

1.2.1.手动触发

手动触发就需要用户在SQL语句中添加Hint提示。

可以通过以下方式,指定使用map join算法,并且表ta是作为map join中的小表。例如:

Select /*+ mapjoin(ta) */
ta.id,tb.id from table_a ta
join table_b tb on ta.id=tb.id;

不过这种方式已经过时,不推荐使用。

1.2.2.自动触发

Hive在编译SQL语句阶段,起初所有的join操作均采用Common Join算法实现。

之后在物理优化阶段,Hive会根据每个Common Join任务所需表的大小排毒案该Comon Join任务是否能转换为Map Join任务,若满足要求,便将Common Join任务自动转换为Map Join任务。

但有些Common Join任务所需的表大小,在SQL的编译阶段是未知的(例如对子查询进行join操作),所以这种Common Join任务是否转换成Map Join任务在编译阶段是无法确定的。

针对这种情况,Hive会在编译阶段生成一个条件任务(Conditional Task),其下会包含一个计划列表,计划列表中包含转换后的 Map Join任务以及原有的Common Join任务。最终具体采用哪个计划,是在运行时决定的。完整的逻辑如下图所示:

在“寻找大表候选人”阶段,跟join的类型也有关系。

相关的配置参数有:

(1)set hive.auto.convert.join=true;——是否启动Map Join转换。

(2)set hive.auto.convert.join.noconditionaltask=true;——是否开启无条件转Map Join。

(3)set hive.mapjoin.smalltable.filesize=250000;——一个Common Join Operator转换为Map Join Operator的判断条件。若该Common Join相关的表中,存在n-1张表的已知大小总和<=该配置值,则生成一个Map Join计划,此时可能存在n-1张表的组合均满足该条件,则hive会为每中满足条件的组合均生成一个Map Join计划,同时还会保留原有的Common Join计划作为后备(backup)计划,世纪运行时,优先执行Map Join计划,若不能执行成功,则启动Common Join后备计划。

(4)set hive.auto.convert.join.noconditionaltask.size=10000000;——无条件转Map Join时的小表之和阈值。若一个Common Join Operator相关的表中,存在n-1张表的大小总和<=该值,此时hive便不会再为每种n-1张表的组合均生成Map Join计划,同时也不会保留Common Join作为后备计划。而是生成一个最优的Map Join计划。

在Join的调优中,需参考上图中各个参数。

1.3.Bucket Map Join

Bucket Map Join是对Map Join算法的改进,打破了Map Join只适用于“大表join小表的限制”,可用于“大表join大表”。

Bucket Map Join的核心思想是:若能保证参与join的表均为分桶表,且关联字段为分桶字段,切其中一张表的分桶数量是另一张表的分桶数量的整数倍,就能保证参与join的两张表的分桶之间具有明确的关联关系,所以就可以在两表的分桶间进行Map Join操作。这样一来,第二个Job的Map端就无需再缓存小表的全表数据了,只需要缓存其所需的分桶即可。原理如下图所示:

就是将大表的数据切分为多个小桶,分而治之。

从图中可以看出,B-0与A-0、A-2是相关联的,而B-1与A-1、A-3是相关联的。所以生成了四个Mapper。

Bucket Map Join不支持自动转换,必须通过用户在SQL语句中提供如下Hint提示,并配置如下相关参数,方可使用。

Hint提示与Map Join一致。

参数有:

(1)set hive.cbo.enable=false;——关闭cbo优化,cbo会导致hint信息被忽略。

(2)set hive.ignore.mapjoin.hint=false;——map join hint 默认会被忽略,因为已经deprecated,需要将此参数设置为false。

(3)set hive optimize.bucketmapjoin=true;——启用bucket map join优化功能。

1.4.Sort Merge Bucket Map Join

简称SMB Map Join,是Bucket Map Join。SMB Map Join要求:参与join的表均为分桶表,且须保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段都为相同字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍。

SMB Map Join同Bucket Join一样,同样是利用两表各分桶之间的关联关系,在分桶之间进行join操作,不同的是分桶之间的join操作的实现原理。Bucket Map Join中,两个分桶之间的join实现原理为Hash Join算法;而SMB Map Join中,两个分桶之间的join实现原理为Sort Merge Join算法。

Hash Join和Sort Merge Join均为关系型数据库中常见的Join算法。Hash Join的原理相对简单,就是对参与join的一张表构建hash table,然后扫描另外一张表,然后进行匹配。Sort Merge Join需要在两张按照关联字段排序好的表中进行。

原理如下图所示:

相当于对两张表顺序读,只有等值判断过程,少了查询的过程。

SMB Map Join也有两种触发方式:Hint提示和自动转换。hint提示已过时,不推荐使用。

自动转换的相关参数有:

(1)set hive.optimize.bucketmapjoin.sortedmerge=true;——启动Sort Merge Bucket Map Join优化。

(2)set hive.auto.convert.sortmerge.join=true;——使用自动转换SMB Join。

2.数据倾斜调优

数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他的key,导致在shuffle阶段,大量相同key的数据被发往同一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。

Hive中数据倾斜常出现在分组聚合和join操作的场景中,下面分别介绍这两个场景下的优化思路。

2.1.分组聚合导致的数据倾斜

如果group by分组字段的值分配不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜。这种问题有以下两种解决思路:

(1)Map-Side聚合优化

开启Map-Side聚合后,数据会在Map端完成部分聚合工作,这样一来,即便是原始数据是倾斜的,经过Map端的初步聚合后,发往Reduce的数据也就不再倾斜了。最佳状态下,Map-Side聚合能完全屏蔽数据倾斜问题。

开启Map-Side聚合的方法如《4.2.2.分组聚合优化》章节所示。

(2)Skew-GroupBy优化

Skew-GroupBy原理是启动两个MR任务。第一个MR按照随机数分区,将数据分散发送到Reduce,完成部分聚合;第二个MR按照分组字段分区,完成最终聚合。

开启Skey-GroupBy优化需要设置以下参数:

Set hive.groupby.skewindata=true;——启用分组聚合数据倾斜优化。默认为false。

一般来说,Map-Side聚合优化的效果要比Skey-GroupBy优化更好。Skey-GroupBy优化的最佳场景在某些数据场景下,Map-Side优化需要大量的内存,并进行不停的shuffle操作,会导致性能下降,这个时候Skey-GroupBy优化就可以闪亮登场了。

2.2.Join导致的数据倾斜

如果关联字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。基本有以下三种优化思路:

2.2.1.map join

可参考《Map Join》章节内容。只适用于大表Join小表的情况。

2.2.2.Skew Join

skew join的原理是:为倾斜的大key单独启动一个map join任务进行计算,其余key进行正常的common join。原理如下图所示:

表A中K1的数据量相比其他的key明显更多,也就是说数据倾斜了。如果启用了skew join后,底层会把表A对应K1的数据和表B对应K1的数据分别写入两个HDFS文件,然后这两个大表和小表进行map join。(注:图中常见假设表A存在数据倾斜,而表B不存在数据倾斜)

相关参数如下:

set hive.optimize.skewjoin=true;——启用skew join优化。

Set hive.skewjoin.key=100000;——触发skew join的阈值,若某个key的行数超过该参数值,则会认为这个可以是数据倾斜的,则触发优化计划。

skew join优化方案对参与join的源表大小没有要求,但是对两表中倾斜的key的数据量有要求,要求一张表中的倾斜的key的数据量比较小(方便走map join)。

2.2.3.调整SQL语句

若参与join的两表均为大表,其中一张表的数据是倾斜的,此时也可以通过以下方式对SQL语句进行相应的调整。

下面通过案例来学习,A、B两张表均为大表,且其中一张表的数据是倾斜的。

假如原始SQL如下:

Select * from A
Join B on A.id = B.id;

调整后的SQL如下:

Select * from (
	select  --打散操作
	concat(id, ‘_’, cast(rand() * 2 as int)) id, value
	from A
) ta
Join (
Select --扩容操作
	concat(id, ‘_’ , 0) id, value
From B
Union all
Select concat(id, ‘_’, 1) id, value
From B
) tb on ta.id = tb.id;

优化中,把一个大key划分为两个均等的小的key,当然,也可以划分为多个,这里就要通过“rand() * N”来实现了,并调整扩容之后的语句。

以上三种优化思路具有其适用场景。针对两张大表都是数据倾斜的情况,当前还没有很好的优化方案。

3.并行度优化

对于一个分布式计算任务而言,设置一个合适的并行度十分重要。Hive的计算任务由MapReduce完成,故并行度的调整需要分为Map端和Reduce端。

3.1.调整Map端并行度

Map端的并行度,是由输入文件的切片数决定的。一般来说,Map端的并行度无需手动调整。但是,如果出现以下情况,可以考虑调整Map端并行度。

(1)查询的表中存在大量小文件

按照Hadoop默认的切片策略,一个小文件会单独启动一个map task负责计算。若查询的表中存在大量小文件,则会启动大量map task,造成计算资源的浪费。这种情况下,可以使用Hive提供的CombineHiveInputFormat,多个小文件合并为一个切片,从而控制map task个数。相关参数为:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

(2)map端游复杂的查询逻辑

若SQL语句中游正则替换、json解析等复杂耗时的查询逻辑时,map端的计算会相对慢一些。若想加快计算速度,在计算资源充足的情况下,可考虑增加map端的并行度,令map task多一些,每个map task计算的数据少一些。相关参数如下:

set mapreduce.input.fileinputformat.spllit.maxsize=256000000;——一个切片的最大值。

3.2.调整Reduce端并行度

Reduce端的并行度,也就是Reduce个数。相对来说,更需要关注。Reduce端的并行度,可由用户自己指定,也可由Hive自行根据MR job输入的文件大小进行估算。

Reduce端的并行度的相关参数如下:

(1)Set mapreduce.job.reduces;——指定Reduce端并行度,默认值为-1,表示用户未指定。

(2)Set hive.exec.reducers.max;——Reduce端并行度最大值。

(3)Set hive.exec.reducers.bytes.per.reducer;——单个Reduce Task计算的数据量,用于估算Reduce并行度。

具体的Reduce端的并行度确定逻辑如下所示:

若指定参数mapreduce.job.reduces的值为非负整数,则Reduce并行度为指定的值。否则,Hive会估算Reduce并行度,估算表达式如下:

min(ceil(totalInputBytes/byesPerReducer), maxReducers)

表达式中,totalInputBytes表示Job输入的文件大小;bytesPerReducer是参数hive.exec.reducers.bytes.per.reducer的值;maxReducer是参数hive.exec.reducers.max的值。

估算的值是依据MR job输入文件的大小作为依据的,这个是不太准确的,但是MapReduce引擎已经过时了,开发者没有进行后续的优化。

4.小文件合并优化

小文件合并有Map端小文件合并和Reduce端小文件合并。

4.1.Map端输入文件合并

可参考《调整Map端并行度》章节。

4.2.Reduce输出文件合并

合并Reduce端输出小文件,是指多个小文件合并成大文件,目的是减少HDFS小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。

相关参数有以下4个:

(1)set hive.merge.mapfiles=true;——开启合并map only任务输出的小文件。默认值为false。

(2)set hive.merge.mapredfiles=true;——开启合并map reduce任务输出的小文件。默认值为false。

(3)set hive.merge.size.per.task=256 000 000;——合并后的文件大小。

(4)set hive.merge.smallfiles.avgsize=16 000 000;——触发小文件合并的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并。