Hadoop3.x学习笔记
一、Hadoop入门
1、Hadoop概述
1.1 简介
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。主要解决海量数据的存储和海量数据的分析计算问题。广义上来说,Hadoop通常是指一个更广泛的概念——Hadoop生态圈。
下载地址:https://hadoop.apache.org/releases.html
1.2 hadoop优势
- 高可靠:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失
- 高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点
- 高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处
理速度 - 高容错性:能够自动将失败的任务重新分配
1.3 hadoop组成
1)HDFS架构概述
Hadoop Distributed File System,简称HDFS,是一个分布式文件系统
- NameNode(NN):存储文件的元数据,如文件名、文件目录结构、文件属性,以及每个文件的块列表和块所在的DataNode等
- DataNode(DN):在本地文件系统存储文件块数据,以及块数据的校验和
- Secondary NameNode(2NN):每隔一段时间对NameNode元数据备份
2)YARN 架构概述
YARN(Yet Another Resource Negotiater):另一种资源协调者,是Hadoop的资源管理器
3)MapReduce架构概述
MapReduce将计算过程分为两个阶段:Map和Reduce
- Map阶段并行处理输入数据
- Reduce阶段对Map结果进行汇总
4)HDFS、YARN、MapReduce三者关系
1.4 大数据技术生态体系
- Sqoop:Sqoop是一款开源的工具,主要用于在Hadoop、Hive与传统的数据库(MySQL)间进行数据的传递,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中
- Flume:Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;
- Kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统;
- Spark:Spark是当前最流行的开源大数据内存计算框架。可以基于Hadoop上存储的大数据进行计算
- Flink:Flink是当前最流行的开源大数据内存计算框架。用于实时计算的场景较多
- Oozie:Oozie是一个管理Hadoop作业(job)的工作流程调度管理系统
- Hbase:HBase是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库
- Hive:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析
- ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等
2、环境准备(重点)
2.1 模板机配置
1 | # 安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G |
另外还需要配置网卡信息和主机名,这样更容易管理
1 | # 首先找到自己的网卡,可能不同机器不一样,比如我的就是eth0 |
2.2 模板创建
1 | # 克隆hadoop100机器,打开后修改IP地址和hostname,重启 |
3、本地运行模式(官方WordCount)
Hadoop运行模式包括:本地模式、伪分布式模式以及完全分布式模式。
- 本地模式:单机运行,只是用来演示一下官方案例。生产环境不用。
- **伪分布式模式:**也是单机运行,但是具备Hadoop集群的所有功能,一台服务器模拟一个分布式的环境。个别缺钱的公司用来测试,生产环境不用。
- **完全分布式模式:**多台服务器组成分布式环境。生产环境使用。
1 | cd /opt/module/hadoop-3.1.3/ |
4、Hadoop集群搭建(🌟重点)
4.1 环境准备(集群分发脚本xsync)
首先准备三台机器,我这里准备hadoop102,103,104,配置好相关hostname和ip,接下来复制相关软件和配置
1 | # =====================scp(secure copy)安全拷贝====================== |
4.2 SSH免密配置
1 | cd ~ |
4.3 集群配置
1)集群部署规划
- NameNode和SecondaryNameNode不要安装在同一台服务器
- ResourceManager也很消耗内存,不要和NameNode、SecondaryNameNode配置在同一台机器上。
hadoop102 | hadoop103 | hadoop104 | |
---|---|---|---|
HDFS | NameNode DataNode | DataNode | SecondaryNameNode DataNode |
YARN | NodeManager | ResourceManager NodeManager | NodeManager |
2)配置文件说明
Hadoop配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值
- 默认配置文件
要获取的默认文件 | 文件存放在Hadoop的jar包中的位置 |
---|---|
[core-default.xml] | hadoop-common-3.1.3.jar/core-default.xml |
[hdfs-default.xml] | hadoop-hdfs-3.1.3.jar/hdfs-default.xml |
[yarn-default.xml] | hadoop-yarn-common-3.1.3.jar/yarn-default.xml |
[mapred-default.xml] | hadoop-mapreduce-client-core-3.1.3.jar/mapred-default.xml |
- 自定义配置文件
core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml四个配置文件存放在$HADOOP_HOME/etc/hadoop
这个路径上,用户可以根据项目需求重新进行修改配置
3)配置集群
核心配置文件,配置core-site.xml
,cd $HADOOP_HOME/etc/hadoop
,vim core-site.xml
1 |
|
HDFS配置文件,配置hdfs-site.xml,vim hdfs-site.xml
1 |
|
YARN配置文件,配置yarn-site.xml,vim yarn-site.xml
1 |
|
MapReduce配置文件,配置mapred-site.xml,vim mapred-site.xml
1 |
|
4)在集群上分发配置好的Hadoop配置文件
1 | xsync /opt/module/hadoop-3.1.3/etc/hadoop/ |
4.4 启动集群
1 | # 首先需要配置workers,其中每一行都代表着一个worker节点的主机名或IP地址,hadoop通过这个文件来确定集群中有哪些节点并且将任务分配到这些节点上 |
然后进行集群基本测试
1 | hadoop fs -mkdir /input |
这里访问SNN:http://hadoop104:9868/status.html
,会发现界面有bug,需要进入hadoop104
1 | cd /opt/module/hadoop-3.1.3/share/hadoop/hdfs/webapps/static |
4.5 配置历史服务器
为了查看程序的历史运行情况,需要配置一下历史服务器,需要配置mapred-site.xml,在该文件里面增加如下配置
1 | <!-- 历史服务器端地址,rpc端口 --> |
然后进行操作
1 | # 分发配置 |
4.6 配置日志的聚集
日志聚集概念:应用运行完成以后,将程序运行日志信息上传到HDFS系统上
日志聚集功能好处:可以方便的查看到程序运行详情,方便开发调试。注意:开启日志聚集功能,需要重新启动 NodeManager 、ResourceManager 和HistoryServer
需要配置 yarn-site.xml,在该文件里面增加如下配置
1 | <!-- 开启日志聚集功能 --> |
启动执行
1 | # 分发配置 |
4.7 集群启动/停止方式总结
1 | # 各个模块分开启动/停止(配置ssh是前提)常用 |
如果集群长时间启动后想去关闭集群,会发现集群无法关闭,这是因为脚本关停是根据服务的pid来关闭的,而hadoop 的 pid 文件默认在 /tmp 文件下,一般七天被系统清理掉,所以导致关停失败,如果需要能使用脚本关停,都开始需要将pid保存到其他路径下
1 | # 来到hadoop程序配置文件 |
4.8 Hadoop集群常用脚本
Hadoop集群启停脚本(包含HDFS,Yarn,Historyserver)
1 | cd /home/atguigu/bin |
1 |
|
查看三台服务器Java进程脚本:jpsall
1 |
|
最后进行启动分发
1 | cd /home/atguigu/bin |
4.9 常用端口号说明
端口名称 | Hadoop2.x | Hadoop3.x |
---|---|---|
NameNode内部通信端口 | 8020 / 9000 | 8020 / 9000/9820 |
NameNode HTTP UI | 50070 | 9870 |
MapReduce查看执行任务端口 | 8088 | 8088 |
历史服务器通信端口 | 19888 | 19888 |
4.10 集群时间同步(可选)
如果服务器在公网环境(能连接外网),可以不采用集群时间同步,因为服务器会定期和公网时间进行校准;如果服务器在内网环境,必须要配置集群时间同步,否则时间久了,会产生时间偏差,导致集群执行任务时间不同步
1 | # ==================时间服务器配置(必须root用户)============== |
二、HDFS
1、HDFS概述
1.1 简介
HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。HDFS的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭之后就不需要改变
1.2 HDFS优缺点
优点
- 高容错性:一个数据会自动保存多个副本,某个副本丢失后,它可以自动恢复
- 适合处理大数据:无论是数据规模还是文件数量规模大都可以处理
- 可构建在廉价的机器上
缺点
-
不适合低延迟的数据访问:如毫秒级的存储数据是做不到的
-
无法高效地对大量小文件进行存储:
- 存储大量小文件时,会占用NameNode大量内存去存储文件目录信息和块信息,而NameNode的内存是有限的
- 小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标
-
不支持并发写入和文件的随机修改
- 不允许多个线程同时写同一文件
- 仅支持数据追加,不支持随机修改
1.3 HDFS组成架构
1.4 HDFS文件块大小(面试重点)
在Hadoop1.x中文件块大小默认为64M,而在2.x和3.x中为128M。当寻址时间为传输时间的1%时为最佳状态。文件块的大小太小,则会导致大文件被分割成太多块,增加寻址时间。而文件块大小太大,则会使得传输时间远大于寻址时间。文件块的大小主要取决于磁盘的传输速率
2、HDFS的Shell操作(重点)
1 | # 帮助查询某个命令 |
3、HDFS的API操作
3.1 环境准备
windows要启动hadooop,首先进去github官网下载对应版本的hadoop,这里我下载了3.1.0,下载完成后将路径的bin目录放入电脑的环境变量;验证Hadoop环境变量是否正常,双击winutils.exe,如果无报错即正常(报错大概率没有微软运行库,安装一下即可,还不行重启试试)
在IDEA中创建一个Maven工程HdfsClientDemo,并导入相应的依赖坐标
1 | <dependencies> |
在项目的src/main/resources目录下,新建一个文件,命名为"log4j.properties"
1 | log4j.rootLogger=INFO, stdout |
创建包名:com.atguigu.hdfs,创建HdfsClient类
1 | public class HdfsClient { |
3.2 HDFS的API案例实操
** HDFS文件上传(测试参数优先级)**
1 |
|
将hdfs-site.xml
拷贝到项目的resources资源目录下,测试发现参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的自定义配置(xxx-site.xml) >(4)服务器的默认配置(xxx-default.xml)
1 |
|
接下来常规API编写
1 | //HDFS文件下载 |
4、HDFS的读写流程(面试重点)
4.1 HDFS 写数据流程
- 客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在
- NameNode返回是否可以上传
- 客户端请求第一个 Block上传到哪几个DataNode服务器上
- NameNode返回3个DataNode节点,分别为dn1、dn2、dn3
- 客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成
- dn1、dn2、dn3逐级应答客户端
- 客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答
- 当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)
对于网络拓扑-节点距离计算,在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据,节点距离:两个节点到达最近的共同祖先的距离总和。
对于机架感知(副本存储节点选择),可以查看官方手册,Crtl + n
查找BlockPlacementPolicyDefault
,在该类中查找chooseTargetInOrder
方法
4.2 HDFS 读数据流程
- 客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址
- 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据
- DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)
- 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件
5、NameNode 和 SecondaryNameNode
5.1 NN和2NN工作机制
NameNode 中的元数据是存储在哪里的?如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage。为了防止NameNode 节点断电,就会产生数据丢失。引入 Edits 文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到 Edits 中。这样,一旦 NameNode 节点断电,可以通过 FsImage 和 Edits 的合并,合成元数据。而新的节点SecondaryNamenode,专门用于FsImage和Edits的合并
5.2 Fsimage和Edits解析
另外会生成两个fsimage,其中另一个是在 Secondary NameNode 上生成的 Checkpoint 文件,它记录了上一次合并的文件系统状态信息。查看Fsimages文件和Edits文件如下
1 | # =====================oiv查看Fsimage文件================= |
5.3 CheckPoint时间设置
- 通常情况下,SecondaryNameNode每隔一小时执行一次,配置文件在
[hdfs-default.xml]
1 | <property> |
- 一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode执行一次
1 | <property> |
6、DataNode
6.1 DataNode工作机制
- 一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳
- DataNode启动后向NameNode注册,通过后,周期性(6小时)的向NameNode上报所有的块信息。
1 | <!--DN向NN汇报当前解读信息的时间间隔,默认6小时--> |
- 心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用
- 集群运行中可以安全加入和退出一些机器
6.2 数据完整性
- 当DataNode读取Block的时候,它会计算CheckSum
- 如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏
- Client读取其他DataNode上的Block
- 常见的校验算法crc(32),md5(128),sha1(160)
- DataNode在其文件创建后周期验证CheckSum
6.3 掉线时限参数设置
需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒
1 | <property> |
三、MapReduce
1、MapReduce概述
1.1 MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析应用"的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1.2 优缺点
优点
- 易于编程。用户只需要关心业务逻辑
- 良好的扩展性。可以动态增加服务器,解决计算资源不够的问题
- 高容错性。任何一台集群挂掉,可以将任务转移到其他节点
- 适合海量数据计算(TB/PB)。几千台服务器共同计算
缺点
- 不擅长实时计算。MySQL擅长
- 不擅长流式计算。Flink擅长
- 不擅长DAG有向无关图计算。Spark擅长
1.3 MapReduce核心思想
一个完整的MapReduce程序在分布式运行时有三类实例进程:
- MrAppMaster:负责整个程序的过程调度及状态协调
- MapTask:负责Map阶段的整个数据处理流程
- ReduceTask:负责Reduce阶段的整个数据处理流程
1.4 序列化类型与编程规范
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritable |
用户编写的程序分为3个部分:Mapper、Reducer和Driver
Mapper阶段
- 用户自定义的Mapper要继承自己的父类
- Mapper的输入是键值对的形式
- Mapper中的业务逻辑写在map()方法中
- Mapper的输出是键值对的形式
- map()方法对每一个<K, V>调用一次
Reducer阶段
- 用户自定义的Reducer要继承自己的父类
- Reducer的输入类型与Mapper的输入类型相对应
- Reducer的业务逻辑写在reduce()方法中
- ReduceTask进程对每一组相同的<K, V>调用一次reduce()方法
Driver阶段
- 相当于Yarn集群的客户端,用于提交整个程序到Yarn集群,提交的是封装
- MapReduce程序相关运行 参数的job对象
1.5 WordCount案例实操
通过IDEA创建工程,环境和上面API操作一致,创建包名com.atguigu.mapreduce.wordcount
1 | // 注意导入包都要选择mapreduce.xxx的,mapred是之前1.x的写法 |
集群上测试,用maven打jar包,需要添加的打包插件依赖,然后package
1 | <build> |
如果系统带有依赖,可以直接打包不用插件的jar包,修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群的/opt/module/hadoop-3.1.3路径
1 | sbin/start-dfs.sh |
2、Hadoop序列化
2.1 概述
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象
一般来说,"活的"对象只生存在内存里,关机断电就没有了。而且对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储"活的"对象,可以将"活的"对象发送到远程计算机。
为什么不用Java的序列化?Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)
2.2 自定义bean对象实现序列化接口(Writable)
企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
1 | public FlowBean() { |
- 重写序列化方法和重写反序列化方法,注意反序列化的顺序和序列化的顺序完全一致
1 |
|
- 要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用
- 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序
1 |
|
2.3 序列化案例实操
数据格式如下所示
1 | 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200 |
创建包
1 | //1 继承 Writable 接口 |
3、MapReduce框架原理
3.1 InputFormat数据输入
切片与MapTask并行度决定机制:MapTask的并行度决定Map阶段任务处理并发度,进而影响到整个job的处理速度。数据块(block)是物理上把数据分成一块一块的,数据块是HDFS数据存储单位。
数据切片只是在逻辑上对输入数据进行分片,数据切片是MapReduce程序计算输入数据的单位。一个切片会对应启动一个MapTask。
Job提交流程源码详解
1 | waitForCompletion() |
FileInputFormat切片源码解析(input.getSplits(job)****)
注意这里小于1.1倍名义上是切成一块,其实存储还是两块,只是把小的那部分通过网络拉取过来形成一块处理
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
TextInputFormat实现类
FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型
CombineTextInputFormat实现类
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理
1 | // 首先准备四个小文件 |
3.2 MapReduce工作流程
3.3 Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle
Shuffle分区
- 如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
- 如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
- 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
- 分区号必须从零开始,逐一累加。
1 | //Partition分区案例实操 |
WritableComparable排序
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
- 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序
- 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
1 | // WritableComparable排序案例实操(全排序) |
Combiner合并
- Combiner是MR程序中Mapper和口Reducer之外的—种组件
- Combiner组件的父类就是Reducer
- Cobiner和口Reducer的区别在于运行的位置
- Combiner是在每一个MapTask所在的节点运行;
- Reducer是接收全局所有Mapper的输出结果;
- Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量
- Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv应该跟Reducer的输入kv类型要对应起来
1 | //Combiner合并案例实操 |
3.4 OutputFormat数据输出
OutputFomat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。默认是TextOutputFormat
1 | //自定义OutputFormat案例实操 |
3.5 MapReduce内核源码解析
MapTask工作机制
- Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value
- Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value
- Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中
- Spill阶段:即"溢写",当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。溢写阶段详情:
- 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
- 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
- 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
- Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销
ReduceTask工作机制
- Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
- Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可
- Reduce阶段:reduce()函数将计算结果写到HDFS上。
ReduceTask并行度决定机制
MapTask并行度由切片个数决定,切片个数由输入文件和切片规则决定。reduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置
1 | // 默认值是1,手动设置为4 |
MapTask & ReduceTask源码解析
1 | =================== MapTask =================== |
3.6 Join应用
现在有以下需求
首先是传统的方式
1 | // TableBean |
但这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。解决方案:Map端实现数据合并
Map Join适用于一张表十分小、一张表很大的场景。在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。具体办法:采用DistributedCache,在Mapper的setup阶段,将文件读取到缓存集合中
1 | //先在MapJoinDriver驱动类中添加缓存文件 |
3.7 数据清洗(ETL)
ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库。
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。**清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。**例如去除日志中字段个数小于等于11的日志
1 | 194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" |
需要在Map阶段对输入的数据根据规则进行过滤清洗,开始编写代码
1 | //编写WebLogMapper类 |
3.8 MapReduce开发总结
-
输入数据接口:InputFormat
- 默认使用的实现类是:TextInputFormat
- TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
- CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
-
逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
-
Partitioner分区
- 有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
- 如果业务上有特别的需求,可以自定义分区
-
Comparable排序
- 当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法
- 部分排序:对最终输出的每一个文件进行内部排序
- 全排序:对所有数据进行排序,通常只有一个Reduce
- 二次排序:排序的条件有两个
-
Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果
-
逻辑处理接口:Reducer
用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()
-
输出数据接口:OutputFormat
- 默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行
- 用户还可以自定义OutputFormat
4、Hadoop数据压缩
4.1 概述
压缩的好处和坏处
- 压缩的优点:以减少磁盘IO、减少磁盘存储空间
- 压缩的缺点:增加CPU开销
压缩原则
- 运算密集型的Job,少用压缩
- IO密集型的Job,多用压缩
4.2 MR支持的压缩编码
压缩格式 | Hadoop自带? | 算法 | 文件扩展名 | 是否可切片 | 换成压缩格式后,原来的程序是否需要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 是,直接使用 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
压缩性能的比较
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
4.3 压缩方式选择
压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片
4.4 压缩参数配置
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器
压缩格式 | 对应的编码/解码器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
要在Hadoop中启用压缩,可以配置如下参数
参数 | 默认值 | 阶段 | 建议 |
---|---|---|---|
io.compression.codecs (在core-site.xml中配置) | 无,这个需要在命令行输入hadoop checknative查看 | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 企业多使用LZO或Snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | xxxxxxxxxx drop table if exists promotion_info;create table promotion_info( promotion_id string comment ‘优惠活动id’, brand string comment ‘优惠品牌’, start_date string comment ‘优惠活动开始日期’, end_date string comment ‘优惠活动结束日期’) comment ‘各品牌活动周期表’;select brand, sum(datediff(end_date,start_date)+1) promotion_day_countfrom( select brand, max_end_date, if(max_end_date is null or start_date>max_end_date,start_date,date_add(max_end_date,1)) start_date, end_date from ( select brand, start_date, end_date, max(end_date) over(partition by brand order by start_date rows between unbounded preceding and 1 preceding) max_end_date from promotion_info )t1)t2where end_date>start_dategroup by brand;sql | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
4.5 压缩实操案例
1 | // 在driver开启mapper压缩,其他都不需要变,即中间文件压缩了,不影响输出 |
四、Yarn
1、Yarn资源调度器
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序
1.1 Yarn基础架构
YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成
1.2 Yarn工作机制
1.3 Yarn调度器和调度算法
目前,Hadoop作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair Scheduler)。Apache Hadoop3.1.3默认的资源调度器是Capacity Scheduler(具体设置详见:yarn-default.xml文件),CDH框架默认调度器是Fair Scheduler
先进先出调度器(FIFO)
FIFO调度器(First In First Out):单队列,根据提交作业的先后顺序,先来先服务
容量调度器(Capacity Scheduler)
Capacity Scheduler 是 Yahoo 开发的多用户调度器
公平调度器(Fair Scheduler)
Fair Schedulere 是 Facebook 开发的多用户调度器
公平调度器设计目标是:在时间尺度上,所有作业获得公平的资源。某一
时刻一个作业应获资源和实际获取资源的差距叫"缺额"。调度器会优先为缺额大的作业分配资源
DRF策略:DRF(Dominant Resource Fairness),我们之前说的资源,都是单一标准,例如只考虑内存(也是Yarn默认的情况)。但是很多时候我们资源有很多种,例如内存,CPU,网络带宽等,这样我们很难衡量两个应用应该分配的资源比例。
1.4 Yarn 常用命令
1 | # yarn状态的查询,除了可以在hadoop103:8088页面查看外,还可以通过命令操作 |
1.5 Yarn 生产环境核心参数
2、Yarn 案例实操
注:调整下列参数之前尽量拍摄 Linux 快照,否则后续的案例,还需要重写准备集群
2.1 Yarn生产环境核心参数配置案例
需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。需求分析:1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster,平均每个节点运行10个 / 3台 ≈ 3个任务(4 3 3),所以要改yarn-site.xml
配置参数如下
1 | <!-- 选择调度器,默认容量 --> |
如果集群的硬件资源不一致,要每个NodeManager单独配置
1 | # 重启集群 |
2.2 容量调度器多队列提交案例
- 需求1:default队列占总内存的40%,最大资源容量占总资源60%,hive队列占总内存的60%,最大资源容量占总资源80%
- 需求2:配置队列优先级
在capacity-scheduler.xml
中配置如下
1 | <!--为新加队列添加必要属性--> |
分发配置文件,重启Yarn或者执行yarn rmadmin -refreshQueues
刷新队列,就可以看到两条队列,然后像Hive提交任务
1 | # 向Hive队列提交任务 |
最后说一下任务优先级,容量调度器,支持任务优先级的配置,在资源紧张时,优先级高的任务将优先获取资源。默认情况,Yarn将所有任务的优先级限制为0,若想使用任务的优先级功能,须开放该限制,修改yarn-site.xml
文件,增加以下参数
1 | <property> |
2.3 公平调度器案例
配置文件参考资料:https://hadoop.apache.org/docs/r3.1.3/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
任务队列放置规则参考资料:https://blog.cloudera.com/untangling-apache-hadoop-yarn-part-4-fair-scheduler-queue-basics/
创建两个队列,分别是test和atguigu(以用户所属组命名)。期望实现以下效果:若用户提交任务时指定队列,则任务提交到指定队列运行;若未指定队列,test用户提交的任务到root.group.test队列运行,atguigu提交的任务到root.group.atguigu队列运行(注:group为用户所属组)。公平调度器的配置涉及到两个文件,一个是yarn-site.xml
,另一个是公平调度器队列分配文件fair-scheduler.xml
(文件名可自定义)。
修改yarn-site.xml
文件,加入以下参数
1 | <property> |
配置fair-scheduler.xml
1 |
|
分发并测试提交
1 | xsync yarn-site.xml |
2.4 Yarn的Tool接口案例
自己写的jar包期望可以动态传参,结果报错,误认为是第一个输入参数,解决方法编写 Yarn 的 Tool 接口,首先编写maven项目,导包
1 | <dependencies> |
1 | //创建类WordCount并实现Tool接口 |
在HDFS上准备输入文件,假设为/input目录,向集群提交该Jar包yarn jar YarnDemo.jar com.atguigu.yarn.WordCountDriver wordcount /input /output
注意此时提交的3个参数,第一个用于生成特定的Tool,第二个和第三个为输入输出目录。此时如果我们希望加入设置参数,可以在wordcount后面添加参数,例如:yarn jar YarnDemo.jar com.atguigu.yarn.WordCountDriver wordcount -Dmapreduce.job.queuename=root.test /input /output1
五、Hadoop生产调优
1、HDFS核心参数
1.1 NameNode内存生产配置
NameNode内存计算,每个文件块大概占用150byte,一台服务器128G内存为例,能存储多少文件块呢?128 * 1024 * 1024 * 1024 / 150Byte ≈ 9.1亿
Hadoop2.x系列,配置NameNode内存,NameNode内存默认2000m,如果服务器内存4G,NameNode内存可以配置3g。在hadoop-env.sh文件中配置如下
1 | HADOOP_NAMENODE_OPTS=-Xmx3072m |
Hadoop3.x系列,配置NameNode内存,hadoop-env.sh中描述Hadoop的内存是动态分配的(hadoop-env.sh在etc/hadoop目录下),比如我4g内存分配了948MB堆内存
1 | # 查看NameNode和DataNode占用内存 |
1.2 NameNode心跳并发配置
每个节点启动时,都会发送心跳包给NN那么NameNode准备多少线程合适?vim hdfs-site.xml
1 | The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes. |
企业经验:dfs.namenode.handler.count=(20乘以以e为底log的ClusterSize),比如集群规模(DataNode台数)为3台时,此参数设置为21。可通过简单的python代码计算该值
1 | import math |
1.3 开启回收站配置
开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用
参数说明:
- 默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间
- 默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等
- 要求fs.trash.checkpoint.interval <= fs.trash.interval
1 | # 测试 |
2、HDFS集群压测
HDFS的读写性能主要受网络和磁盘影响比较大。为了方便测试,将hadoop102、hadoop103、hadoop104虚拟机网络都设置为100mbps。测试网速:来到hadoop102的/opt/module
目录,创建一个
1 | python -m SimpleHTTPServer |
2.1 测试HDFS写性能
测试内容:向HDFS集群写10个128M的文件
1 | # 注意:nrFiles n为生成mapTask的数量,生产环境一般可通过hadoop103:8088查看CPU核数,设置为(CPU核数 - 1) |
- Number of files:生成mapTask数量,一般是集群中(CPU核数-1),我们测试虚拟机就按照实际的物理内存-1分配即可
- Total MBytes processed:单个map处理的文件大小
- Throughput mb/sec:单个mapTak的吞吐量 。计算方式:处理的总文件大小/每一个mapTask写数据的时间累加;集群整体吞吐量:生成mapTask数量*单个mapTak的吞吐量
- Average IO rate mb/sec:平均mapTak的吞吐量。计算方式:每个mapTask处理文件大小/每一个mapTask写数据的时间,全部相加除以task数量
- IO rate std deviation:方差、反映各个mapTask处理的差值,越小越均衡
注意:如果测试过程中,出现异常,说明检查了虚拟内存,可以在yarn-site.xml中设置虚拟内存检测为false,分发配置并重启Yarn集群sbin/stop-yarn.sh
1 | <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true --> |
测试结果分析,由于副本1就在本地,所以该副本不参与测试,一共参与测试的文件:10个文件 * 2个副本 = 20个,压测后的速度:1.61,实测速度:1.61M/s * 20个文件 ≈ 32M/s,三台服务器的带宽:12.5 + 12.5 + 12.5 ≈ 30m/s,所有网络资源都已经用满。如果实测速度远远小于网络,并且实测速度不能满足工作需求,可以考虑采用固态硬盘或者增加磁盘个数
如果客户端不在集群节点,那就三个副本都参与计算
2.2 测试HDFS读性能
测试内容:读取HDFS集群10个128M的文件
1 | hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB |
3、HDFS多目录
3.1 NameNode多目录配置
NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性(非高可用)
在hdfs-site.xml文件中添加如下内容,注意:因为每台服务器节点的磁盘情况不同,所以这个配置配完之后,可以选择不分发
1 | <property> |
停止集群,删除三台节点的data和logs中所有数据,格式化集群并启动
1 | # 三台都要删除 |
3.2 DataNode多目录配置(重要)
DataNode可以配置成多个目录,每个目录存储的数据不一样(数据不是副本)
在hdfs-site.xml文件中添加如下内容
1 | <property> |
重启集群,查看结果
1 | # /opt/module/hadoop-3.1.3/data/dfs |
3.3 集群数据均衡之磁盘间数据均衡
生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性)
1 | # 生成均衡计划(我们只有一块磁盘,不会生成计划) |
4、HDFS集群扩容及缩容(重要)
4.1 添加白名单
白名单:表示在白名单的主机IP地址可以,用来存储数据,而非白名单的节点只能作为客户端访问,无法存储数据。企业中:配置白名单,可以尽量防止黑客恶意访问攻击
1 | # 在NameNode节点的/opt/module/hadoop-3.1.3/etc/hadoop目录下分别创建whitelist 和blacklist文件 |
4.2 新增服务器节点
随着公司业务的增长,数据量越来越大,原有的数据节点的容量已经不能满足存储数据的需求,需要在原有集群基础上动态添加新的数据节点
1 | # 环境准备 |
4.3 服务器间数据均衡
在企业开发中,如果经常在hadoop102和hadoop104上提交任务,且副本数为2,由于数据本地性原则,就会导致hadoop102和hadoop104数据过多,hadoop103存储的数据量小。另一种情况,就是新服役的服务器数据量比较少,需要执行集群均衡命令
1 | # 开启数据均衡命令 |
4.4 黑名单退役服务器
黑名单:表示在黑名单的主机IP地址不可以,用来存储数据。企业中:配置黑名单,用来退役服务器
1 | # 编辑/opt/module/hadoop-3.1.3/etc/hadoop目录下的blacklist文件 |
4.5 HDFS—集群迁移
1 | # 采用scp拷贝数据 |
5、HDFS存储优化
5.1 纠删码
HDFS默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。Hadoop3.x引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间
1 | [atguigu@hadoop102 hadoop-3.1.3]$ hdfs ec |
- RS-3-2-1024k:使用RS编码,每3个数据单元,生成2个校验单元,共5个单元,也就是说:这5个单元中,只要有任意的3个单元存在(不管是数据单元还是校验单元,只要总数=3),就可以得到原始数据。每个单元的大小是1024k=1024*1024=1048576
- RS-10-4-1024k:使用RS编码,每10个数据单元(cell),生成4个校验单元,共14个单元,也就是说:这14个单元中,只要有任意的10个单元存在(不管是数据单元还是校验单元,只要总数=10),就可以得到原始数据。每个单元的大小是1024k=1024*1024=1048576
- RS-6-3-1024k:使用RS编码,每6个数据单元,生成3个校验单元,共9个单元,也就是说:这9个单元中,只要有任意的6个单元存在(不管是数据单元还是校验单元,只要总数=6),就可以得到原始数据。每个单元的大小是1024k=1024*1024=1048576(默认开启)
- RS-LEGACY-6-3-1024k:策略和上面的RS-6-3-1024k一样,只是编码的算法用的是rs-legacy
- XOR-2-1-1024k:使用XOR编码(速度比RS编码快),每2个数据单元,生成1个校验单元,共3个单元,也就是说:这3个单元中,只要有任意的2个单元存在(不管是数据单元还是校验单元,只要总数= 2),就可以得到原始数据。每个单元的大小是1024k=1024*1024=1048576
1 | # 纠删码案例实操,将/input目录设置为RS-3-2-1024k策略 |
5.2 异构存储(冷热数据分离)
1 | # 查看当前有哪些存储策略可以用 |
我们这里进行测试,集群规划如下
节点 | 存储类型分配 |
---|---|
hadoop102 | RAM_DISK,SSD |
hadoop103 | SSD,DISK |
hadoop104 | DISK,RAM_DISK |
hadoop105 | ARCHIVE |
hadoop106 | ARCHIVE |
1 | <!--为hadoop102节点的hdfs-site.xml添加如下信息--> |
1 | # 启动集群,注意要删除data和logs目录 |
环境搭建完,下面详细介绍集中存储策略
1 | # =========================HOT存储策略案例======================== |
6、HDFS故障排除
6.1 NameNode故障处理
NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode
1 | # 故障模拟 |
6.2 集群安全模式&磁盘修复
- **安全模式:**文件系统只接受读数据请求,而不接受删除、修改等变更请求
- 进入安全模式场景
- NameNode在加载镜像文件和编辑日志期间处于安全模式;
- NameNode再接收DataNode注册时,处于安全模式
退出安全模式条件
- dfs.namenode.safemode.min.datanodes:最小可用datanode数量,默认0
- dfs.namenode.safemode.threshold-pct:副本数达到最小要求的block占系统总block数的百分比,默认0.999f。(只允许丢一个块)
- dfs.namenode.safemode.extension:稳定时间,默认值30000毫秒,即30秒
1 | # 集群处于安全模式,不能执行重要操作(写操作)。集群启动完成后,自动退出安全模式 |
6.3 慢磁盘监控
"慢磁盘"指的时写入数据非常慢的一类磁盘。其实慢性磁盘并不少见,当机器运行时间长了,上面跑的任务多了,磁盘的读写性能自然会退化,严重时就会出现写入数据延时的问题。如何发现慢磁盘?正常在HDFS上创建一个目录,只需要不到1s的时间。如果你发现创建目录超过1分钟及以上,而且这个现象并不是每次都有。只是偶尔慢了一下,就很有可能存在慢磁盘。可以采用如下方法找出是哪块磁盘慢:
- 通过心跳未联系时间,一般出现慢磁盘现象,会影响到DataNode与NameNode之间的心跳。正常情况心跳时间间隔是3s。超过3s说明有异常
- fio命令,测试磁盘的读写性能
1 | sudo yum install -y fio |
6.4 小文件归档
HDFS存储小文件弊端
每个文件均按块存储,每个块的元数据存储在NameNode的内存中,因此HDFS存储小文件会非常低效。因为大量的小文件会耗尽NameNode中的大部分内存。但注意,存储小文件所需要的磁盘容量和数据块的大小无关。例如,一个1MB的文件设置为128MB的块存储,实际使用的是1MB的磁盘空间,而不是128MB
解决存储小文件办法之一
HDFS存档文件或HAR文件,是一个更高效的文件存档工具,它将文件存入HDFS块,在减少NameNode内存使用的同时,允许对文件进行透明的访问。具体说来,HDFS存档文件对内还是一个一个独立文件,对NameNode而言却是一个整体,减少了NameNode的内存
1 | # 案例实操 |
7、MapReduce生产经验(重点)
7.1 MapReduce慢的原因
apReduce程序效率的瓶颈在于两点:
-
计算机性能
CPU、内存、磁盘、网络
-
I/O操作优化
- 数据倾斜
- Map运行时间太长,导致Reduce等待过久
- 小文件过多
7.2 MapReduce常用调优参数
7.3 MapReduce数据倾斜问题
- 数据频率倾斜——某一个区域的数据量要远远大于其他区域
- 数据大小倾斜——部分记录的大小远远大于平均值
减少数据倾斜的方法
- 首先检查是否空值过多造成的数据倾斜,生产环境,可以直接过滤掉空值;如果想保留空值,就自定义分区,将空值加随机数打散。最后再二次聚合
- 能在map阶段提前处理,最好先在Map阶段处理。如:Combiner、MapJoin
- 设置多个reduce个数
8、Yarn生产经验
1 | # =========================Resourcemanager相关================ |
9、Hadoop综合调优
9.1 Hadoop小文件优化方法
HDFS上每个文件都要在NameNode上创建对应的元数据,这个元数据的大小约为150byte,这样当小文件比较多的时候,就会产生很多的元数据文件,**一方面会大量占用NameNode的内存空间,另一方面就是元数据文件过多,使得寻址索引速度变慢。**小文件过多,在进行MR计算时,会生成过多切片,需要启动过多的MapTask。每个MapTask处理的数据量小,导致MapTask的处理时间比启动时间还小,白白消耗资源。
Hadoop小文件解决方案有几种方案:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS(数据源头)
- Hadoop Archive(存储方向):是一个高效的将小文件放入HDFS块中的文件存档工具,能够将多个小文件打包成一个HAR文件,从而达到减少NameNode的内存使用
- CombineTextInputFormat(计算方向):CombineTextInputFormat用于将多个小文件在切片过程中生成一个单独的切片或者少量的切片
- 开启uber模式,实现JVM重用(计算方向):默认情况下,每个Task任务都需要启动一个JVM来运行,如果Task任务计算的数据量很小,我们可以让同一个Job的多个Task运行在一个JVM中,不必为每个Task都开启一个JVM
1 | # 未开启uber模式,在/input路径上上传多个小文件并执行wordcount程序 |
9.2 测试MapReduce计算性能
使用Sort程序评测MapReduce(注:一个虚拟机不超过150G磁盘尽量不要执行这段代码)
1 | # 使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数 |
9.3 企业开发场景案例
需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。需求分析:1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster平均每个节点运行10个 / 3台 ≈ 3个任务(4 3 3)
HDFS参数调优
1 | # 修改:hadoop-env.sh |
分别修改hdfs-site.xml和修改core-site.xml
1 | <!-- NameNode有一个工作线程池,默认值是10 --> |
分发配置:xsync hadoop-env.sh hdfs-site.xml core-site.xml
MapReduce参数调优
修改mapred-site.xml,修改完后分发
1 | <!-- 环形缓冲区大小,默认100m --> |
Yarn参数调优
修改yarn-site.xml配置参数如下,最后分发
1 | <!-- 选择调度器,默认容量 --> |
最后执行程序
1 | # 重启集群 |
六、Hadoop高可用集群部署
1、 HA 概述
HA(High Availablity),即高可用(7*24 小时不中断服务),实现高可用最关键的策略是消除单点故障。HA 严格来说应该分成各个组件的 HA
机制:HDFS 的 HA 和 YARN 的 HA。NameNode 主要在以下两个方面影响 HDFS 集群
- NameNode 机器发生意外,如宕机,集群将无法使用,直到管理员重启
- NameNode 机器需要升级,包括软件、硬件升级,此时集群也将无法使用
HDFS HA 功能通过配置多个 NameNodes(Active/Standby)实现在集群中对 NameNode 的热备来解决上述问题。如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种方式将 NameNode 很快的切换到另外一台机器。
2、HDFS-HA 集群搭建
2.1 集群规划
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
NameNode | Secondarynamenode | |
DataNode | DataNode | DataNode |
HA 的主要目的是消除 namenode 的单点故障,需要将 hdfs 集群规划成以下模样
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
NameNode | NameNode | NameNode |
DataNode | DataNode | DataNode |
2.2 HDFS-HA 核心问题
怎么保证三台 namenode 的数据一致?
- Fsimage:让一台 nn 生成数据,让其他机器 nn 同步
- Edits:需要引进新的模块 JournalNode 来保证 edtis 的文件的数据一致性
怎么让同时只有一台 nn 是 active,其他所有是 standby 的?
- 手动分配
- 自动分配
2nn 在 ha 架构中并不存在,定期合并 fsimage 和 edtis 的活谁来干?
- 由 standby 的 nn 来干
如果 nn 真的发生了问题,怎么让其他的 nn 上位干活?
- 手动故障转移
- 自动故障转移
3、HDFS-HA 手动模式
3.1 环境准备
首先按照之前的配置搭建好环境,在每个集群需要启动JournalNode
3.2 配置 HDFS-HA 集群
1 | # 在 opt 目录下创建一个 ha 文件夹 |
然后修改配置文件,这里windows可以使用sublime直接修改保存(File→SFTP/FTP→Browse Server,自行安装插件)
配置 core-site.xml
1 | <configuration> |
配置 hdfs-site.xml
1 |
|
最后分发配置好的 hadoop 环境到其他节点
1 | # 其他节点先创建好目录修改权限 |
3.3 启动 HDFS-HA 集群
1 | # 将 HADOOP_HOME 环境变量更改到 HA 目录(三台机器) |
4、HDFS-HA 自动模式
4.1 自动故障转移工作机制
自动故障转移为 HDFS 部署增加了两个新组件:ZooKeeper 和 ZKFailoverController(ZKFC)进程,如图所示。ZooKeeper 是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务
4.2 HDFS-HA 自动故障转移配置
首先配置hadoop集群的配置文件,在 hdfs-site.xml 中增加
1 | <!-- 启用 nn 故障自动转移 --> |
在 core-site.xml 文件中增加
1 | <!-- 指定 zkfc 要连接的 zkServer 地址 --> |
修改后分发配置文件
1 | xsync hadoop/ |
如果杀死进程不能切换active的话,可以修改下hdfs.site.xml里面的隔离方法
1 | <property> |
可以参考:https://blog.csdn.net/chanyue123/article/details/108637181
4.3 解决 NN 连接不上 JN 的问题
自动故障转移配置好以后,然后使用 start-dfs.sh 群起脚本启动 hdfs 集群,有可能会遇到 NameNode 起来一会后,进程自动关闭的问题。
查看报错日志,可分析出报错原因是因为 NameNode 连接不上 JournalNode,而利用 jps 命令查看到三台 JN 都已经正常启动,为什么 NN 还是无法正常连接到 JN 呢?这因为 start-dfs.sh 群起脚本默认的启动顺序是先启动 NN,再启动 DN,然后再启动 JN,并且默认的 rpc 连接参数是重试次数为 10,每次重试的间隔是 1s,也就是说启动完 NN以后的 10s 中内,JN 还启动不起来,NN 就会报错了
core-default.xml 里面有两个参数如下
1 | <!-- NN 连接 JN 重试次数,默认是 10 次 --> |
解决方案:遇到上述问题后,可以稍等片刻,等 JN 成功启动后,手动启动下三台,也可以在 core-site.xml 里面适当调大上面的两个参数
1 | hdfs --daemon start namenode |
5、YARN-HA 配置
官方文档:https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html
5.1 环境与核心问题
如果当前 active rm 挂了,其他 rm 怎么将其他 standby rm 上位
- 核心原理跟 hdfs 一样,利用了 zk 的临时节点
当前 rm 上有很多的计算程序在等待运行,其他的 rm 怎么将这些程序接手过来接着跑
- rm 会将当前的所有计算程序的状态存储在 zk 中,其他 rm 上位后会去读取,然后接着跑
5.2 配置 YARN-HA 集群
配置yarn-site.xml
1 | <configuration> |
分发并执行
1 | # 同步更新其他节点的配置信息 |
5.3 HADOOP HA 的最终规划
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
NameNode | NameNode | NameNode |
DataNode | DataNode | DataNode |
JournalNode | JournalNode | JournalNode |
Zookeeper | Zookeeper | Zookeeper |
ZKFC | ZKFC | ZKFC |