Hadoop3.x源码解析
Hadoop3.x源码解析
一、RPC通信原理解析
1、概要
模拟RPC的客户端、服务端、通信协议三者如何工作的
2、代码demo
在HDFSClient项目基础上创建包名com.atguigu.rpc,创建RPC协议
1 | public interface RPCProtocol { |
创建RPC服务端
1 | public class NNServer implements RPCProtocol{ |
创建RPC客户端
1 | public class HDFSClient { |
测试,启动服务端,观察控制台打印:服务器开始工作,在控制台Terminal窗口输入,jps,查看到NNServer服务
启动客户端,观察客户端控制台打印:我是客户端,观察服务端控制台打印:服务端,创建路径/input
二、NameNode启动源码解析
1、概述
然后首先需要环境准备,导入依赖
1 | <dependencies> |
ctrl+h或者双击shift全局查找namenode,进入NameNode.java,然后ctrl + f,查找main方法,点击createNameNode
,点击最后default返回的NameNode
,点击initialize
初始化,核心方法就在里面
1 | protected void initialize(Configuration conf) throws IOException { |
2、启动9870端口服务
点击startHttpServer
1 | private void startHttpServer(final Configuration conf) throws IOException { |
点击startHttpServer方法中的httpServer.start();
1 | void start() throws IOException { |
点击setupServlets,这里就是一些控制台的各个功能页跳转
1 | private static void setupServlets(HttpServer2 httpServer, Configuration conf) { |
3、加载镜像文件和编辑日志
点击loadNamesystem
1 | protected void loadNamesystem(Configuration conf) throws IOException { |
4、初始化NN的RPC服务端
点击createRpcServer,如第一章的服务端RPC开启,为客户端提供服务支持,客户端可以通过rpc协议发送指令
1 | protected NameNodeRpcServer createRpcServer(Configuration conf) |
5、NN启动资源检查
点击startCommonServices
1 | private void startCommonServices(Configuration conf) throws IOException { |
点击startCommonServices
1 | void startCommonServices(Configuration conf, HAContext haContext) throws IOException { |
点击NameNodeResourceChecker
1 | public NameNodeResourceChecker(Configuration conf) throws IOException { |
点击checkAvailableResources
1 | void checkAvailableResources() { |
if (!resource.isResourceAvailable()),ctrl+alt+B可以查看其实现类
1 | public boolean isResourceAvailable() { |
6、NN对心跳超时判断
Ctrl + n 搜索namenode,ctrl + f搜索startCommonServices
,点击namesystem.startCommonServices(conf, haContext);
点击blockManager.activate(conf, completeBlocksTotal);
点击datanodeManager.activate(conf);
1 | void activate(final Configuration conf) { |
7、安全模式
1 | void startCommonServices(Configuration conf, HAContext haContext) throws IOException { |
点击getCompleteBlocksTotal
1 | public long getCompleteBlocksTotal() { |
点击activate
1 | public void activate(Configuration conf, long blockTotal) { |
点击setBlockTotal
1 | void setBlockTotal(long total) { |
点击areThresholdsMet
1 | private boolean areThresholdsMet() { |
三、DataNode启动源码解析
1、概述
工作机制
启动源码流程
查找DataNode.class
1 | public static void main(String args[]) { |
2、初始化DataXceiverServer
点击initDataXceiver
1 | private void initDataXceiver() throws IOException { |
3、初始化HTTP服务
点击startInfoServer();
1 | private void startInfoServer() |
4、初始化DN的RPC服务端
点击initIpcServer
1 | private void initIpcServer() throws IOException { |
5、DN向NN注册
点击refreshNamenodes
1 | void refreshNamenodes(Configuration conf) |
点击startAll()
1 | synchronized void startAll() throws IOException { |
ctrl + f 搜索run方法
1 | public void run() { |
返回,点击register
1 | void register(NamespaceInfo nsInfo) throws IOException { |
回到NN,搜索NameNodeRpcServer
1 | public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) |
6、向NN发送心跳
点击BPServiceActor.java中的run方法中的offerService方法
1 | private void offerService() throws Exception { |
回到NN,搜索NameNodeRpcServer类,ctrl + f 在NameNodeRpcServer.java中搜索sendHeartbeat
1 | public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, |
四、HDFS上传源码解析
1、概述
HDFS的写数据流程
HDFS上传源码解析
2、create创建过程
2.1 DN向NN发起创建请求
1 |
|
ctrl+alt+B选择DistributedFileSystem实现方法
1 |
|
点击newStreamForCreate,进入DFSOutputStream.java
1 | static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, |
2.2 NN处理DN的创建请求
点击create
1 | HdfsFileStatus create(String src, FsPermission masked, |
查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create
1 | public HdfsFileStatus create(String src, FsPermission masked, |
2.3 DataStreamer启动流程
NN处理完DN请求后,再次回到DN端,启动对应的线程
1 | //DFSOutputStream.java |
点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java
1 | protected synchronized void start() { |
3、write上传过程
3.1 向DataStreamer的队列里面写数据
1 |
|
DataStreamer.java
1 | void queuePacket(DFSPacket packet) { |
3.2 建立管道之机架感知(块存储位置)
全局查找DataStreamer,搜索run方法
1 |
|
回到namenode,点击NameNodeRpcServer,在该类中搜索addBlock
1 | public LocatedBlock addBlock(String src, String clientName, |
3.3 建立管道之Socket发送
点击DataStreamer的nextBlockOutputStream
1 | protected LocatedBlock nextBlockOutputStream() throws IOException { |
3.4 建立管道之Socket接收
全局查找DataXceiverServer.java,在该类中查找run方法
1 | public void run() { |
点击DataXceiver(线程),查找run方法
1 | public void run() { |
ctrl +alt +b 查找writeBlock的实现类DataXceiver.java
1 | public void writeBlock(... ...) throws IOException { |
3.5 客户端接收DN写数据应答Response
全局查找DataStreamer,搜索run方法
1 |
|
点击response再点击ResponseProcessor,ctrl + f 查找run方法
1 | public void run() { |
五、Yarn源码解析
1、概述
YARN工作机制
yarn源码解析
2、Yarn客户端向RM提交作业
在wordcount程序的驱动类中点击
1 | boolean result = job.waitForCompletion(true); |
创建提交环境,ctrl + alt +B 查找submitJob实现类,YARNRunner.java
1 | public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) |
向Yarn提交,点击submitJob方法中的submitApplication(),ctrl + alt +B 查找submitApplication实现类,YarnClientImpl.java
1 | public ApplicationId |
3、RM启动MRAppMaster
1 | <dependency> |
查找MRAppMaster,搜索main方法
1 | public static void main(String[] args) { |
ctrl + alt +B 查找serviceInit实现类,MRAppMaster.java
1 | protected void serviceInit(final Configuration conf) throws Exception { |
点击MRAppMaster.java 中的initAndStartAppMaster 方法中的appMaster.start();
1 | public void start() { |
ctrl + alt +B 查找handle实现类,GenericEventHandler.java
1 | class GenericEventHandler implements EventHandler<Event> { |
4、调度器任务执行(YarnChild)
查找YarnChild,搜索main方法
1 | public static void main(String[] args) throws Throwable { |
ctrl + alt +B 查找run实现类,maptask.java
1 | public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) |
启动ReduceTask,在YarnChild.java类中的main方法中ctrl + alt +B 查找run实现类,reducetask.java
1 | public void run(JobConf job, final TaskUmbilicalProtocol umbilical) |
六、MapReduce源码解析
之前有介绍
1、Job提交流程源码和切片源码详解
1 | //Job提交流程源码详解 |
FileInputFormat 切片源码解析(input.getSplits(job))
2、MapTask & ReduceTask 源码解析
1 | //MapTask源码解析流程 |
七、Hadoop源码编译
1、环境准备
具体可以看build.txt文件,修改源码中的HDFS副本数的设置
回到Centos系统,Jar包准备(Hadoop源码、JDK8、Maven、Ant 、Protobuf)
- hadoop-3.1.3-src.tar.gz
- jdk-8u212-linux-x64.tar.gz
- apache-maven-3.6.3-bin.tar.gz
- protobuf-2.5.0.tar.gz(序列化的框架)
- cmake-3.17.0.tar.gz
2、工具包安装
注意:所有操作必须在root用户下完成
1 | # 分别创建/opt/software/hadoop_source和/opt/module/hadoop_source路径 |
3、编译源码
1 | # 进入解压后的Hadoop源码目录下 |