Hive3.1.3基础学习
一、Hive入门与安装
1、Hive入门
1.1 简介
Hive是由Facebook开源,基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能
Hive是一个Hadoop客户端,用于将HQL(Hive SQL)转化成MapReduce程序
- Hive中每张表的数据存储在HDFS
- Hive分析数据底层的实现是MapReduce(也可配置为Spark或者Tez)
- 执行程序运行在Yarn上
1.2 Hive架构原理
用户接口:Client
JDBC的移植性比ODBC好;(通常情况下,安装完ODBC驱动程序之后,还需要经过确定的配置才能够应用。而不相同的配置在不相同数据库服务器之间不能够通用。所以,安装一次就需要再配置一次。JDBC只需要选取适当的JDBC数据库驱动程序,就不需要额外的配置。在安装过程中,JDBC数据库驱动程序会自己完成有关的配置。)两者使用的语言不同,JDBC在Java编程时使用,ODBC一般在C/C++编程时使用
元数据:Metastore
元数据包括:数据库(默认是default)、表名、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等。默认存储在自带的derby数据库中,由于derby数据库只支持单客户端访问,生产环境中为了多人开发,推荐使用MySQL存储Metastore
驱动器:Driver
- 解析器(SQLParser):将SQL字符串转换成抽象语法树(AST)
- 语义分析(Semantic Analyzer):将AST进一步划分为QeuryBlock
- 逻辑计划生成器(Logical Plan Gen):将语法树生成逻辑计划
- 逻辑优化器(Logical Optimizer):对逻辑计划进行优化
- 物理计划生成器(Physical Plan Gen):根据优化后的逻辑计划生成物理计划
- 物理优化器(Physical Optimizer):对物理计划进行优化
- 执行器(Execution):执行该计划,得到查询结果并返回给客户端
2、Hive安装
2.1 安装地址
Hive官网地址:http://hive.apache.org/
文档查看地址:https://cwiki.apache.org/confluence/display/Hive/GettingStarted
下载地址:http://archive.apache.org/dist/hive/
github地址:https://github.com/apache/hive
2.2 Hive最小化安装(测试用)
1 | # 本文章前请先学习hadoop |
2.3 MySQL安装
1 | # 下载mysql,离线安装 |
2.4 配置Hive元数据存储到MySQL
1 | # 新建Hive元数据库 |
添加如下内容:
1 |
|
1 | # 初始化Hive元数据库(修改为采用MySQL存储元数据) |
2.5 Hive服务部署
Hive的hiveserver2服务的作用是提供jdbc/odbc接口,为用户提供远程访问Hive数据的功能,例如用户期望在个人电脑中访问远程服务中的Hive数据,就需要用到Hiveserver2
在远程访问Hive数据时,客户端并未直接访问Hadoop集群,而是由Hivesever2代理访问。由于Hadoop集群中的数据具备访问权限控制,所以此时需考虑一个问题:那就是访问Hadoop集群的用户身份是谁?是Hiveserver2的启动用户?还是客户端的登录用户?
答案是都有可能,具体是谁,由Hiveserver2的hive.server2.enable.doAs
参数决定,该参数的含义是是否启用Hiveserver2用户模拟的功能。若启用,则Hiveserver2会模拟成客户端的登录用户去访问Hadoop集群的数据,不启用,则Hivesever2会直接使用启动用户访问Hadoop集群数据。模拟用户的功能,默认是开启的。生产环境,推荐开启用户模拟功能,因为开启后才能保证各用户之间的权限隔离
1 | # hivesever2的模拟用户功能,依赖于Hadoop提供的proxy user(代理用户功能),只有Hadoop中的代理用户才能模拟其他用户的身份访问Hadoop集群。 |
接下来进行metastore部署
Hive的metastore服务的作用是为Hive CLI或者Hiveserver2提供元数据访问接口,metastore有两种运行模式,分别为嵌入式模式(之前默认就是嵌入式启动)和独立服务模式。
生产环境中,不推荐使用嵌入式模式。因为其存在以下两个问题:嵌入式模式下,每个Hive CLI都需要直接连接元数据库,当Hive CLI较多时,数据库压力会比较大;每个客户端都需要用户元数据库的读写权限,元数据库的安全得不到很好的保证。
1 | # ==========================嵌入式模式======================= |
2.6 Hive服务启动脚本(了解)
- nohup:放在命令开头,表示不挂起,也就是关闭终端进程也继续保持运行状态
- /dev/null:是Linux文件系统中的一个文件,被称为黑洞,所有写入该文件的内容都会被自动丢弃
- 2>&1:表示将错误重定向到标准输出上
- &:放在命令结尾,表示后台运行
一般会组合使用:nohup [xxx命令操作]> file 2>&1 &,表示将xxx命令运行的结果输出到file中,并保持命令启动的进程在后台运行。
1 | # 直接编写脚本来管理服务的启动和关闭 |
3、Hive使用技巧
3.1 Hive常用交互命令
1 | hive -help |
3.2 Hive参数配置方式
1 | # 查看当前所有的配置信息 |
3.3 Hive常见属性配置
Hive客户端显示当前库和表头,vim hive-site.xml
1 | <property> |
Hive运行日志路径配置,Hive的log默认存放在/tmp/atguigu/hive.log
目录下(当前用户名下),修改Hive的log存放日志到/opt/module/hive/logs
1 | # 修改$HIVE_HOME/conf/hive-log4j2.properties.template文件名称为hive-log4j2.properties |
Hive的JVM堆内存设置,新版本的Hive启动的时候,默认申请的JVM堆内存大小为256M,JVM堆内存申请的太小,导致后期开启本地模式,执行复杂的SQL时经常会报错:java.lang.OutOfMemoryError: Java heap space,因此最好提前调整一下HADOOP_HEAPSIZE这个参数
1 | # 修改$HIVE_HOME/conf下的hive-env.sh.template为hive-env.sh |
关闭Hadoop虚拟内存检查,在yarn-site.xml中关闭虚拟内存检查(虚拟内存校验,如果已经关闭了,就不需要配了),修改完后记得分发yarn-site.xml,并重启yarn。vim /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
1 | <property> |
二、DDL&DML语句
1、DDL(Data Definition Language)数据定义
1.1 数据库
1 | # 创建数据库 |
1.2 表(table)创建
普通建表
1 | # 创建表 |
-
TEMPORARY,临时表,该表只在当前会话可见,会话结束,表会被删除
-
EXTERNAL(重点),外部表,与之相对应的是内部表(管理表)。管理表意味着Hive会完全接管该表,包括元数据和HDFS中的数据。而外部表则意味着Hive只接管元数据,而不完全接管HDFS中的数据
-
data_type(重点),Hive中的字段类型可分为基本数据类型和复杂数据类型
Hive 说明 定义 t****inyint 1byte有符号整数 s****mallint 2byte有符号整数 i****nt 4byte有符号整数 b****igint 8byte有符号整数 b****oolean 布尔类型,true或者false f****loat 单精度浮点数 d****ouble 双精度浮点数 decimal 十进制精准数字类型 decimal(16,2) varchar 字符序列,需指定最大长度,最大长度的范围是[1,65535] varchar(32) string 字符串,无需指定最大长度 t****imestamp 时间类型 b****inary 二进制数据 复杂数据类型如下 类型 说明 定义 ------ ---------------------------- ---------------------------- array 数组是一组相同类型的值的集合 array<string> map map是一组相同类型的键-值对集合 map<string, int> struct 结构体由多个属性组成,每个属性都有自己的属性名和数据类型 struct<id:int, name:string> Hive的基本数据类型可以做类型转换,转换的方式包括隐式转换以及显示转换,详情可参考Hive官方说明:Allowed Implicit Conversions - 任何整数类型都可以隐式地转换为一个范围更广的类型,如tinyint可以转换成int,int可以转换成bigint
- 所有整数类型、float和string类型都可以隐式地转换成double。
- tinyint、smallint、int都可以转换为float
- boolean类型不可以转换为任何其它的类型
1
2
3# 显示转换可以借助cast函数完成显示的类型转换
# cast(expr as <type>)
hive (default)> select '1' + 2, cast('1' as int) + 2; -
PARTITIONED BY(重点),创建分区表
-
CLUSTERED BY … SORTED BY…INTO … BUCKETS(重点),创建分桶表
-
ROW FORMAT(重点),指定SERDE,SERDE是Serializer and Deserializer的简写。Hive使用SERDE序列化和反序列化每行数据。详情可参考 Hive-Serde
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 语法一:DELIMITED关键字表示对文件中的每个字段按照特定分割符进行分割,其会使用默认的SERDE对每行数据进行序列化和反序列化
ROW FORAMT DELIMITED
[FIELDS TERMINATED BY char]
[COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char]
[LINES TERMINATED BY char]
[NULL DEFINED AS char]
# fields terminated by :列分隔符
# collection items terminated by : map、struct和array中每个元素之间的分隔符
# map keys terminated by :map中的key与value的分隔符
# lines terminated by :行分隔符
# [TBLPROPERTIES (property_name=property_value, ...)]
# [AS select_statement]
# null 一般在hdfs用/N表示
# 语法二:SERDE关键字可用于指定其他内置的SERDE或者用户自定义的SERDE。例如JSON SERDE,可用于处理JSON字符串
ROW FORMAT SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value,property_name=property_value, ...)] -
STORED AS(重点),指定文件格式,常用的文件格式有,textfile(默认值),sequence file,orc file、parquet file等等
-
xxxxxxxxxx # 进入解压后的Hadoop源码目录下#开始编译mvn clean package -DskipTests -Pdist,native -Dtar# 注意:第一次编译需要下载很多依赖jar包,编译时间会很久,预计1小时左右,最终成功是全部SUCCESS# 成功的64位hadoop包在/opt/module/hadoop_source/hadoop-3.1.3-src/hadoop-dist/target下bash
-
TBLPROPERTIES,用于配置表的一些KV键值对参数
Create Table As Select(CTAS)建表
1 | # 该语法允许用户利用select查询语句返回的结果,直接建表,表的结构和查询语句的结构保持一致,且保证包含select查询语句放回的内容。 |
Create Table Like语法
1 | # 该语法允许用户复刻一张已经存在的表结构,与上述的CTAS语法不同,该语法创建出来的表中不包含数据 |
案例说明
1 | # =====================内部表与外部表==================== |
1.3 表的增删查改
1 | # ====================查看表===================== |
2、DML(Data Manipulation Language)数据操作
2.1 Load
Load语句可将文件导入到Hive表中,关键字说明:
- local:表示从本地加载数据到Hive表;否则从HDFS加载数据到Hive表
- overwrite:表示覆盖表中已有数据,否则表示追加
- partition:表示上传到指定分区,若目标是分区表,需指定分区
1 | # 语法 |
2.2 Insert
LOAD DATA
用于将外部数据直接加载到Hive表中,而INSERT INTO
用于将数据从一个表插入到另一个表中或将查询结果插入到表中,并可以进行数据转换、过滤或聚合操作
1 | # ====================将查询结果插入表中===== |
2.3 Export&Import
Export导出语句可将表的数据和元数据信息一并到处的HDFS路径,Import可将Export导出的内容导入Hive,表的数据和元数据信息都会恢复。Export和Import可用于两个Hive实例之间的数据迁移
1 | --导出 |
三、查询
1、基础语法
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
基本语法
1 | SELECT [ALL | DISTINCT] select_expr, select_expr, ... |
2、基本查询(Select…From)
2.1 数据准备
1 | # 在/opt/module/hive/datas/路径上创建dept.txt文件,并赋值如下内容: |
2.2 基本查询操作
1 | # SQL 语言大小写不敏感 |
2.3 关系运算函数
操作符 | 支持的数据类型 | 描述 |
---|---|---|
A=B | 基本数据类型 | 如果A等于B则返回true,反之返回false |
A<=>B | 基本数据类型 | 如果A和B都为null或者都不为null,则返回true,如果只有一边为null,返回false |
A<>B, A!=B | 基本数据类型 | A或者B为null则返回null;如果A不等于B,则返回true,反之返回false |
A<B | 基本数据类型 | A或者B为null,则返回null;如果A小于B,则返回true,反之返回false |
A<=B | 基本数据类型 | A或者B为null,则返回null;如果A小于等于B,则返回true,反之返回false |
A>B | 基本数据类型 | A或者B为null,则返回null;如果A大于B,则返回true,反之返回false |
A>=B | 基本数据类型 | A或者B为null,则返回null;如果A大于等于B,则返回true,反之返回false |
A [not] between B and C | 基本数据类型 | 如果A,B或者C任一为null,则结果为null。如果A的值大于等于B而且小于或等于C,则结果为true,反之为false。如果使用not关键字则可达到相反的效果。 |
A is null | 所有数据类型 | 如果A等于null,则返回true,反之返回false |
A is not null | 所有数据类型 | 如果A不等于null,则返回true,反之返回false |
in(数值1,数值2) | 所有数据类型 | 使用 in运算显示列表中的值 |
A [not] like B | string 类型 | B是一个SQL下的简单正则表达式,也叫通配符模式,如果A与其匹配的话,则返回true;反之返回false。B的表达式说明如下:‘x%’表示A必须以字母‘x’开头,‘%x’表示A必须以字母‘x’结尾,而‘%x%’表示A包含有字母‘x’,可以位于开头,结尾或者字符串中间。如果使用not关键字则可达到相反的效果。 |
A rlike B, A regexp B | string 类型 | B是基于java的正则表达式,如果A与其匹配,则返回true;反之返回false。匹配使用的是JDK中的正则表达式接口实现的,因为正则也依据其中的规则。例如,正则表达式必须和整个字符串A相匹配,而不是只需与其字符串匹配。 |
2.4 逻辑运算函数
1 | # 基本语法(and/or/not) |
2.5 聚合函数
- count(*),表示统计所有行数,包含null值
- count(某列),表示该列一共有多少行,不包含null值
- max(),求最大值,不包含null,除非所有值都是null
- min(),求最小值,不包含null,除非所有值都是null
- sum(),求和,不包含null
- avg(),求平均值,不包含null
1 | # 本地模式mapreduce都运行在一个进程内,里面各个线程,之前配的是yarn |
3、分组
3.1 Group By语句
Group By语句通常会和聚合函数一起使用,按照一个或者多个列队结果进行分组,然后对每个组执行聚合操作
1 | # 计算emp表每个部门的平均工资 |
3.2 Having语句
having与where不同点
- where后面不能写分组聚合函数,而having后面可以使用分组聚合函数
- having只用于group by分组统计语句
1 | # 求每个部门的平均工资 |
4、Join语句
1 | # 等值Join |
5、排序
5.1 全局排序(Order By)
Order By:全局排序,只有一个Reduce
- asc(ascend):升序(默认)
- desc(descend):降序
1 | # 通常和limit配合使用 |
5.2 每个Reduce内部排序(Sort By)
Sort By:对于大规模的数据集order by的效率非常低。在很多情况下,并不需要全局排序,此时可以使用Sort by。Sort by为每个reduce产生一个排序文件。每个Reduce内部进行排序,对全局结果集来说不是排序。
1 | # 设置reduce个数 |
5.3 分区(Distribute By)
Distribute By:在有些情况下,我们需要控制某个特定行应该到哪个Reducer,通常是为了进行后续的聚集操作。distribute by子句可以做这件事。distribute by类似MapReduce中partition(自定义分区),进行分区,结合sort by使用。
对于distribute by进行测试,一定要分配多reduce进行处理,否则无法看到distribute by的效果
1 | set mapreduce.job.reduces=3; |
- distribute by的分区规则是根据分区字段的hash码与reduce的个数进行相除后,余数相同的分到一个区
- Hive要求distribute by语句要写在sort by语句之前
注意:演示完以后mapreduce.job.reduces的值要设置回-1,否则下面分区or分桶表load跑MapReduce的时候会报错
5.4 分区排序(Cluster By)
当distribute by和sort by字段相同时,可以使用cluster by方式。cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为asc或者desc
1 | # 注意:按照部门编号分区,不一定就是固定死的数值,可以是20号和30号部门分到一个分区里面去 |
6、函数
6.1 简介
Hive会将常用的逻辑封装成函数给用户进行使用,类似于Java中的函数,文档参考:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
1 | # 查看系统内置函数 |
6.2 单行函数
单行函数按照功能可分为如下几类: 日期函数、字符串函数、集合函数、数学函数、流程控制函数等
1 | # =======数值函数 |
6.3 高级聚合函数
1 | # 普通聚合 count/sum |
6.4 炸裂函数
UDTF (Table-GeneratingFunctions),接收一行数据,输出一行或多行数据
1 | # 使用语法 |
6.5 窗口函数(开窗函数)
窗口函数,能为每行数据划分—个窗口,然后对窗口范围内的数据进行计算,最后将计算结果返回给该行数据。https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics
窗口函数的语法中主要包括“窗口”和“函数”两部分。其中“窗口”用于定义计算范围,“函数”用于定义计算逻辑。
1 | select order_id,order_date,amount,函数(amount) over(窗口范围) total_amount from order_info; |
窗口范围的定义分为两种类型,一种是基于行的,一种是基于值的
- 基于行示例:要求每行数据的窗口为上一行到当前行
- 基于值示例:要求每行数据的窗口为,值位于当前值-1,到当前值(关键词range)
窗口分区,定义窗口范围时,可以指定分区字段,每个分区单独划分窗口
对于窗口缺省函数
- partition by省略不写,表示不分区
- order by 省略不写,表示不排序
- (rows|range) between … and …省略不写,则使用其默认值,默认值如下:若over()中包含order by,则默认值为
range between unbounded preceding and current row;
若over()中不包含order by,则默认值为rows between unbounded preceding and unbounded following
下面举例其他常用的窗口函数,lead和lag,first_value和last_value,排名函数,注:lag和lead,rank 、dense_rank、row_number函数不支持自定义窗口
1 | # 案例实操 |
7、自定义函数
7.1 介绍
官网文档地址:https://cwiki.apache.org/confluence/display/Hive/HivePlugins
Hive自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展,当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)
根据用户自定义函数类别分为以下三种
- UDF(User-Defined-Function)一进一出
- UDAF(User-Defined Aggregation Function),用户自定义聚合函数,多进一出。类似于:count/max/min
- UDTF(User-Defined Table-Generating Functions),用户自定义表生成函数,一进多出。如lateral view explode()
1 | # 编程步骤如下 |
7.2 自定义UDF函数
需求:自定义一个UDF实现计算给定基本数据类型的长度,例如select my_len("abcd");
首先创建一个Maven工程Hive导入依赖(自定义UDTF也是同理)
1 | <dependencies> |
创建类
1 | /** |
1 | # ==================创建临时函数======================= |
四、分区、分桶表和文件
1、分区表
Hive中的分区就是把一张大表的数据按照业务需要分散的存储到多个目录,每个目录就称为该表的一个分区。在查询时通过where子句中的表达式选择查询所需要的分区,这样的查询效率会提高很多
1.1 分区表基本语法
1 | create table dept_partition |
修复分区
Hive将分区表的所有分区信息都保存在了元数据中,只有元数据与HDFS上的分区路径一致时,分区表才能正常读写数据。若用户手动创建/删除分区路径,Hive都是感知不到的,这样就会导致Hive的元数据和HDFS的分区路径不一致。再比如,若分区表为外部表,用户执行drop partition命令后,分区元数据会被删除,而HDFS的分区路径不会被删除,同样会导致Hive的元数据和HDFS的分区路径不一致。
若出现元数据和HDFS路径不一致的情况,可通过如下几种手段进行修复
- add partition,若手动创建HDFS的分区路径,Hive无法识别,可通过add partition命令增加分区元数据信息,从而使元数据和分区路径保持一致
- drop partition,若手动删除HDFS的分区路径,Hive无法识别,可通过drop partition命令删除分区元数据信息,从而使元数据和分区路径保持一致
- msck,若分区元数据和HDFS的分区路径不一致,还可使用msck命令进行修复
1 | msck repair table table_name [add/drop/sync partitions]; |
1.2 二级分区表
1 | # 如果一天内的日志数据量也很大,如何再将数据拆分?答案是二级分区表,例如可以在按天分区的基础上,再对每天的数据按小时进行分区 |
1.3 动态分区
动态分区是指向分区表insert数据时,被写往的分区不由用户指定,而是由每行数据的最后一个字段的值来动态的决定。使用动态分区,可只用一个insert语句将数据写入多个分区
1 | # 动态分区功能总开关(默认true,开启) |
2、分桶表
2.1 概述
分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围划分,分区针对的是数据的存储路径,分桶针对的是数据文件。
分桶表的基本原理是,首先为每行数据计算一个指定字段的数据的hash值,然后模以一个指定的分桶数,最后将取模运算结果相同的行,写入同一个文件中,这个文件就称为一个分桶(bucket)
2.2 分桶表基本语法
1 | # 建表语句 |
2.3 分桶排序表
1 | # 排序是指在分桶的基础上,对每个桶中的数据按照某一列或多列进行排序。排序可以优化查询性能,因为查询可以通过更有效地利用数据的有序性来加速执行。排序可以在创建表格时进行定义,指定排序的列和排序顺序 |
3、Hive文件格式
为Hive表中的数据选择一个合适的文件格式,对提高查询性能的提高是十分有益的。Hive表数据的存储格式,可以选择text file、orc、parquet、sequence file等
3.1 Text File
文本文件是Hive默认使用的文件格式,文本文件中的一行内容,就对应Hive表中的一行记录。可通过以下建表语句指定文件格式为文本文件:
1 | create table textfile_table(column_specs) stored as textfile; |
3.2 ORC
ORC(Optimized Row Columnar)file format是Hive 0.11版里引入的一种列式存储的文件格式。ORC文件能够提高Hive读写数据和处理数据的性能。与列式存储相对的是行式存
-
行存储的特点
查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。
-
列存储的特点
因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法
前文提到的text file和sequence file都是基于行存储的,orc和parquet是基于列式存储的。orc文件的具体结构如下图所示
每个ORC文件由Header、Body和Tail三部分组成,其中Header内容为ORC,用于表示文件类型。Body由1个或多个stripe组成,每个stripe一般为HDFS的块大小,每一个stripe包含多条记录,这些记录按照列进行独立存储,每个stripe里有三部分组成,分别是Index Data,Row Data,Stripe Footer
。
- **Index Data:**一个轻量级的index,默认是为各列每隔1W行做一个索引。每个索引会记录第n万行的位置,和最近一万行的最大值和最小值等信息
- **Row Data:**存的是具体的数据,按列进行存储,并对每个列进行编码,分成多个Stream来存储
- **Stripe Footer:**存放的是各个Stream的位置以及各column的编码信息
Tail由File Footer和PostScript组成。File Footer中保存了各Stripe的其实位置、索引长度、数据长度等信息,各Column的统计信息等;PostScript记录了整个文件的压缩类型以及File Footer的长度信息等。
在读取ORC文件时,会先从最后一个字节读取PostScript长度,进而读取到PostScript,从里面解析到File Footer长度,进而读取FileFooter,从中解析到各个Stripe信息,再读各个Stripe,即从后往前读。
1 | # 建表语句 |
参数 | 默认值 | 说明 |
---|---|---|
orc.compress | ZLIB | 压缩格式,可选项:NONE、ZLIB,、SNAPPY,一般和SNAPPY配合 |
orc.compress.size | 262,144 | 每个压缩块的大小(ORC文件是分块压缩的) |
orc.stripe.size | 67,108,864 | 每个stripe的大小 |
orc.row.index.stride | 10,000 | 索引步长(每隔多少行数据建一条索引) |
3.3 parquet
Parquet文件是Hadoop生态中的一个通用的文件格式,它也是一个列式存储的文件格式。Parquet文件的格式如下图所示
上图展示了一个Parquet文件的基本结构,文件的首尾都是该文件的Magic Code,用于校验它是否是一个Parquet文件。首尾中间由若干个Row Group和一个Footer(File Meta Data)组成。每个Row Group包含多个Column Chunk,每个Column Chunk包含多个Page。以下是Row Group、Column,Chunk和Page三个概念的说明:
- 行组(Row Group):一个行组对应逻辑表中的若干行
- 列块(Column Chunk):一个行组中的一列保存在一个列块中
- 页(Page):一个列块的数据会划分为若干个页
- Footer(File Meta Data)中存储了每个行组(Row Group)中的每个列快(Column Chunk)的元数据信息,元数据信息包含了该列的数据类型、该列的编码方式、该类的Data Page位置等信息。
1 | Create table parquet_table |
参数 | 默认值 | 说明 |
---|---|---|
parquet.compression | uncompressed | 压缩格式,可选项:uncompressed,snappy,gzip,lzo,brotli,lz4 |
parquet.block.size | 134217728 | 行组大小,通常与HDFS块大小保持一致 |
parquet.page.size | 1048576 | 页大小 |
4、压缩
hive的压缩和hadoop保持一致,详情参考hadoop
压缩格式 | 算法 | 文件扩展名 | 是否可切分 |
---|---|---|---|
DEFLATE | DEFLATE | .deflate | 否 |
Gzip | DEFLATE | .gz | 否 |
bzip2 | bzip2 | .bz2 | 是 |
LZO | LZO | .lzo | 是 |
Snappy | Snappy | .snappy | 否 |
4.1 Hive表数据进行压缩
在Hive中,不同文件类型的表,声明数据压缩的方式是不同的
TextFile
若一张表的文件类型为TextFile,若需要对该表中的数据进行压缩,多数情况下,无需在建表语句做出声明。直接将压缩后的文件导入到该表即可,Hive在查询表中数据时,可自动识别其压缩格式,进行解压。需要注意的是,在执行往表中导入数据的SQL语句时,用户需设置以下参数,来保证写入表中的数据是被压缩的
1 | --SQL语句的最终输出结果是否压缩 |
ORC
若一张表的文件类型为ORC,若需要对该表数据进行压缩,需在建表语句中声明压缩
1 | create table orc_table |
Parquet
若一张表的文件类型为Parquet,若需要对该表数据进行压缩
1 | create table orc_table |
4.2 计算过程中使用压缩
1 | # 单个MR的中间结果进行压缩 |
五、企业级调优
1、计算资源配置
1.1 Yarn资源配置
yarn.nodemanager.resource.memory-mb
该参数的含义是,一个NodeManager节点分配给Container使用的内存。该参数的配置,取决于NodeManager所在节点的总内存容量和该节点运行的其他服务的数量,默认8Gyarn.nodemanager.resource.cpu-vcores
该参数的含义是,一个NodeManager节点分配给Container使用的CPU核数。该参数的配置,同样取决于NodeManager所在节点的总CPU核数和该节点运行的其他服务,默认8核,一般一个核心对于4gyarn.scheduler.maximum-allocation-mb
该参数的含义是,单个Container能够使用的最大内存yarn.scheduler.minimum-allocation-mb
该参数的含义是,单个Container能够使用的最小内存
修改$HADOOP_HOME/etc/hadoop/yarn-site.xml文件
1 | <property> |
1.2 MapReduce资源配置
MapReduce资源配置主要包括Map Task的内存和CPU核数,以及Reduce Task的内存和CPU核数。核心配置参数如下:
mapreduce.map.memory.mb
该参数的含义是,单个Map Task申请的container容器内存大小,其默认值为1024。该值不能超出yarn.scheduler.maximum-allocation-mb和yarn.scheduler.minimum-allocation-mb规定的范围。该参数需要根据不同的计算任务单独进行配置,在hive中,可直接使用如下方式为每个SQL语句单独进行配置:
1 | set mapreduce.map.memory.mb=2048; |
mapreduce.map.cpu.vcores
该参数的含义是,单个Map Task申请的container容器cpu核数,其默认值为1。该值一般无需调整mapreduce.reduce.memory.mb
该参数的含义是,单个Reduce Task申请的container容器内存大小,其默认值为1024。该值同样不能超出yarn.scheduler.maximum-allocation-mb和yarn.scheduler.minimum-allocation-mb规定的范围。该参数需要根据不同的计算任务单独进行配置,在hive中,可直接使用如下方式为每个SQL语句单独进行配置:
1 | set mapreduce.reduce.memory.mb=2048; |
mapreduce.reduce.cpu.vcores
该参数的含义是,单个Reduce Task申请的container容器cpu核数,其默认值为1。该值一般无需调整
2、测试用表
1 | # 资料包自行领取,https://download.csdn.net/download/lemon_TT/87685013 |
3、Explain查看执行计划(重点)
3.1 Explain执行计划概述
Explain呈现的执行计划,由一系列Stage组成,这一系列Stage具有依赖关系,每个Stage对应一个MapReduce Job,或者一个文件系统操作等。
若某个Stage对应的一个MapReduce Job,其Map端和Reduce端的计算逻辑分别由Map Operator Tree和Reduce Operator Tree进行描述,Operator Tree由一系列的Operator组成,一个Operator代表在Map或Reduce阶段的一个单一的逻辑操作,例如TableScan Operator,Select Operator,Join Operator等
- TableScan:表扫描操作,通常map端第一个操作肯定是表扫描操作
- Select Operator:选取操作
- Group By Operator:分组聚合操作
- Reduce Output Operator:输出到 reduce 操作
- Filter Operator:过滤操作
- Join Operator:join 操作
- File Output Operator:文件输出操作
- Fetch Operator 客户端获取数据操作
3.2 基本用法
1 | EXPLAIN [FORMATTED | EXTENDED | DEPENDENCY] query-sql |
4、 HQL之分组聚合优化
4.1 优化说明
Hive中未经优化的分组聚合,是通过一个MapReduce Job实现的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
Hive对分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个hash table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至reduce端,完成最终的聚合。map-side聚合能有效减少shuffle的数据量,提高分组聚合运算的效率
1 | --启用map-side聚合 |
5、HQL之Join优化
5.1 Join算法概述
Hive拥有多种join算法,包括Common Join,Map Join,Bucket Map Join,Sort Merge Buckt Map Join等
Common Join
Common Join是Hive中最稳定的join算法,其通过一个MapReduce Job完成一个join操作。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作
需要注意的是,sql语句中的join操作和执行计划中的Common Join任务并非一对一的关系,一个sql语句中的相邻的且关联字段相同的多个join操作可以合并为一个Common Join任务
Map Join
Map Join算法可以通过两个只有map阶段的Job完成一个join操作。其适用场景为大表join小表。若某join操作满足要求,则第一个Job会读取小表数据,将其制作为hash table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作
Bucket Map Join
Bucket Map Join是对Map Join算法的改进,其打破了Map Join只适用于大表join小表的限制,可用于大表join大表的场景。Bucket Map Join的核心思想是:若能保证参与join的表均为分桶表,且关联字段为分桶字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍,就能保证参与join的两张表的分桶之间具有明确的关联关系,所以就可以在两表的分桶间进行Map Join操作了。这样一来,第二个Job的Map端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶即可。
Sort Merge Bucket Map Join
Sort Merge Bucket Map Join(简称SMB Map Join)基于Bucket Map Join。SMB Map Join要求,参与join的表均为分桶表,且需保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段为相同字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍。
SMB Map Join同Bucket Join一样,同样是利用两表各分桶之间的关联关系,在分桶之间进行join操作,不同的是,分桶之间的join操作的实现原理。Bucket Map Join,两个分桶之间的join实现原理为Hash Join算法;而SMB Map Join,两个分桶之间的join实现原理为Sort Merge Join算法。Hash Join和Sort Merge Join均为关系型数据库中常见的Join实现算法。Hash Join的原理相对简单,就是对参与join的一张表构建hash table,然后扫描另外一张表,然后进行逐行匹配。Sort Merge Join需要在两张按照关联字段排好序的表中进行
Hive中的SMB Map Join就是对两个分桶的数据按照上述思路进行Join操作。可以看出,SMB Map Join与Bucket Map Join相比,在进行Join操作时,Map端是无需对整个Bucket构建hash table,也无需在Map端缓存整个Bucket数据的,每个Mapper只需按顺序逐个key读取两个分桶的数据进行join即可
5.2 Map Join
Map Join有两种触发方式,一种是用户在SQL语句中增加hint提示,另外一种是Hive优化器根据参与join表的数据量大小,自动触发
**Hint提示,**用户可通过如下方式,指定通过map join算法,并且ta将作为map join中的小表。这种方式已经过时,不推荐使用。
1 | select /*+ mapjoin(ta) */ |
自动触发
Hive在编译SQL语句阶段,起初所有的join操作均采用Common Join算法实现。之后在物理优化阶段,Hive会根据每个Common Join任务所需表的大小判断该Common Join任务是否能够转换为Map Join任务,若满足要求,便将Common Join任务自动转换为Map Join任务。
但有些Common Join任务所需的表大小,在SQL的编译阶段是未知的(例如对子查询进行join操作),所以这种Common Join任务是否能转换成Map Join任务在编译阶是无法确定的。针对这种情况,Hive会在编译阶段生成一个条件任务(Conditional Task),其下会包含一个计划列表,计划列表中包含转换后的Map Join任务以及原有的Common Join任务。最终具体采用哪个计划,是在运行时决定的
1 | --启动Map Join自动转换 |
优化案例
1 | # 示例 |
5.3 Bucket Map Join
Bucket Map Join不支持自动转换,发须通过用户在SQL语句中提供如下Hint提示,并配置如下相关参数,方可使用
1 | select /*+ mapjoin(ta) */ |
两张表都相对较大,若采用普通的Map Join算法,则Map端需要较多的内存来缓存数据,当然可以选择为Map段分配更多的内存,来保证任务运行成功。但是,Map端的内存不可能无上限的分配,所以当参与Join的表数据量均过大时,就可以考虑采用Bucket Map Join算法
5.4 Sort Merge Bucket Map Join
Sort Merge Bucket Map Join有两种触发方式,包括Hint提示和自动转换。Hint提示已过时,不推荐使用
1 | --启动Sort Merge Bucket Map Join优化 |
6、HQL之数据倾斜
6.1 数据倾斜概述
数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往同一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。
6.2 分组聚合导致的数据倾斜
Hive中未经优化的分组聚合,是通过一个MapReduce Job实现的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。如果group by分组字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题,由分组聚合导致的数据倾斜问题,有以下两种解决思路:Map-Side聚合和Skew-GroupBy优化
Map-Side聚合
开启Map-Side聚合后,数据会现在Map端完成部分聚合工作。这样一来即便原始数据是倾斜的,经过Map端的初步聚合后,发往Reduce的数据也就不再倾斜了。最佳状态下,Map-端聚合能完全屏蔽数据倾斜问题
1 | --启用map-side聚合,默认开启的,若想看到数据倾斜的现象,需要先将hive.map.aggr参数设置为false |
Skew-GroupBy优化
Skew-GroupBy的原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce,完成部分聚合,第二个MR按照分组字段分区,完成最终聚合
1 | --启用分组聚合数据倾斜优化 |
6.3 Join导致的数据倾斜
未经优化的join操作,默认是使用common join算法,也就是通过一个MapReduce Job完成计算。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。如果关联字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。由join导致的数据倾斜问题,有如下几种解决方案
map join
使用map join算法,join操作仅在map端就能完成,没有shuffle操作,没有reduce阶段,自然不会产生reduce端的数据倾斜。该方案适用于大表join小表时发生数据倾斜的场景
1 | --启动Map Join自动转换 |
skew join
skew join的原理是,为倾斜的大key单独启动一个map join任务进行计算,其余key进行正常的common join
1 | --启用skew join优化 |
调整SQL语句
若参与join的两表均为大表,其中一张表的数据是倾斜的,此时也可通过以下方式对SQL语句进行相应的调整
1 | -- 相同的id的进入同一个reducer进行后序操作 |
7、HQL之任务并行度
对于一个分布式的计算任务而言,设置一个合适的并行度十分重要。Hive的计算任务由MapReduce完成,故并行度的调整需要分为Map端和Reduce端
7.1 Map端并行度
Map端的并行度,也就是Map的个数。是由输入文件的切片数决定的。一般情况下,Map端的并行度无需手动调整。以下特殊情况可考虑调整map端并行度
- 查询的表中存在大量小文件
按照Hadoop默认的切片策略,一个小文件会单独启动一个map task负责计算。若查询的表中存在大量小文件,则会启动大量map task,造成计算资源的浪费。这种情况下,可以使用Hive提供的CombineHiveInputFormat,多个小文件合并为一个切片,从而控制map task个数,默认开启
1 | set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; |
- map端有复杂的查询逻辑
若SQL语句中有正则替换、json解析等复杂耗时的查询逻辑时,map端的计算会相对慢一些。若想加快计算速度,在计算资源充足的情况下,可考虑增大map端的并行度,令map task多一些,每个map task计算的数据少一些
1 | --一个切片的最大值,默认256M |
7.2 Reduce端并行度
Reduce端的并行度,也就是Reduce个数。相对来说,更需要关注。Reduce端的并行度,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算
1 | --指定Reduce端并行度,默认值为-1,表示用户未指定 |
8、HQL之小文件合并
小文件合并优化,分为两个方面,分别是Map端输入的小文件合并,和Reduce端输出的小文件合并
8.1 Map端输入文件合并
合并Map端输入的小文件,是指将多个小文件划分到一个切片中,进而由一个Map Task去处理。目的是防止为单个小文件启动一个Map Task,浪费计算资源
1 | --可将多个小文件切片,合并为一个切片,进而由一个map任务处理 |
8.2 Reduce输出文件合并
合并Reduce端输出的小文件,是指将多个小文件合并成大文件。目的是减少HDFS小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并
1 | --开启合并map only任务输出的小文件,默认false |
9、其他优化
9.1 CBO优化
CBO是指Cost based Optimizer,即基于计算成本的优化。在Hive中,计算成本模型考虑到了:数据的行数、CPU、本地IO、HDFS IO、网络IO等方面。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在hive的MR引擎下主要用于join的优化,例如多表join的join顺序
1 | --是否启用cbo优化,默认开启 |
9.2 谓词下推
谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量
1 | --是否启动谓词下推(predicate pushdown)优化 |
9.3 矢量化查询
Hive的矢量化查询优化,依赖于CPU的矢量化计算。Hive的矢量化查询,可以极大的提高一些典型查询场景(例如scans, filters, aggregates, and joins)下的CPU使用效率。参考:https://cwiki.apache.org/confluence/display/Hive/Vectorized+Query+Execution#VectorizedQueryExecution-Limitations
1 | -- 若执行计划中,出现“Execution mode: vectorized”字样,即表明使用了矢量化计算 |
9.4 Fetch抓取
Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:select * from emp;
在这种情况下,Hive可以简单地读取emp对应的存储目录下的文件,然后输出查询结果到控制台
1 | --是否在特定场景转换为fetch 任务 |
9.5 本地模式
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短
1 | --开启自动转换为本地模式 |
9.6 并行执行
Hive会将一个SQL语句转化成一个或者多个Stage,每个Stage对应一个MR Job。默认情况下,Hive同时只会执行一个Stage。但是某SQL语句可能会包含多个Stage,但这多个Stage可能并非完全互相依赖,也就是说有些Stage是可以并行执行的。此处提到的并行执行就是指这些Stage的并行执行
1 | --启用并行执行优化 |
9.7 严格模式
Hive可以通过设置某些参数防止危险操作
- 分区表不使用分区过滤
将hive.strict.checks.no.partition.filter
设置为true时,对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
- 使用order by没有limit过滤
将hive.strict.checks.orderby.no.limit
设置为true时,对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reduce中进行处理,强制要求用户增加这个limit语句可以防止Reduce额外执行很长一段时间(开启了limit可以在数据进入到Reduce之前就减少一部分数据)。
- 笛卡尔积
将hive.strict.checks.cartesian.product
设置为true时,会限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用where语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
1 | # 可以进入hive-site.xml修改 |
六、实战案例
1、同时在线人数问题
现有各直播间的用户访问记录表(live_events)如下,表中每行数据表达的信息为,一个用户何时进入了一个直播间,又在何时离开了该直播间,现要求统计各直播间最大同时在线人数
1 | drop table if exists live_events; |
2、会话划分问题
现有页面浏览记录表(page_view_events),表中有每个用户的每次页面访问记录,规定若同一用户的相邻两次访问记录时间间隔小于60s,则认为两次浏览记录属于同一会话。现有如下需求,为属于同一会话的访问记录增加一个相同的会话id字段
1 | drop table if exists page_view_events; |
3、间断连续登录用户问题
现有各用户的登录记录表(login_events),表中每行数据表达的信息是一个用户何时登录了平台,现要求统计各用户最长的连续登录天数,间断一天也算作连续,例如:一个用户在1,3,5,6登录,则视为连续6天登录。
1 | drop table if exists login_events; |
4、日期交叉问题
现有各品牌优惠周期表(promotion_info),其记录了每个品牌的每个优惠活动的周期,其中同一品牌的不同优惠活动的周期可能会有交叉,现要求统计每个品牌的优惠总天数,若某个品牌在同一天有多个优惠活动,则只按一天计算
1 | drop table if exists promotion_info; |