一. DataNode类图
二. DateNode属性说明
- DatanodeProtocol namenode:RPC代理类。RPC.getProxy(nameNodeAddress)得到远程NameNode代理类。后续的versionRequest(),register(),sendHeartbeat(),blockReceived(),blockReport()都是namenode的行为。
- org.apache.hadoop.ipc.Server ipcServer:RPC Server。conf里配置ipcAddr=0.0.0.0:50020,ipcServer = RPC.getServer(this, ipcAddr.getHostName()...),此DataNode作为RPC Server提供远程服务,提供接口ClientDatanodeProtocol和InterDatanodeProtocol中定义的服务。
- HttpServer infoServer:a Jetty embedded server to answer http requests。
- DataStorage storage:参考http://zy19982004.iteye.com/admin/blogs/1878758。
- DatanodeRegistration dnRegistration:DatanodeRegistration class conatins all information the Namenode needs to identify and verify a Datanode when it contacts the Namenode.后续的sendHeartbeat(),blockReceived(),blockReport()方法第一个参数就是DatanodeRegistration 。
- FSDatasetInterface data:FSDataset的接口,管理着DataNode的所有block。参考http://zy19982004.iteye.com/admin/blogs/1880303。
- DataBlockScanner blockScanner:定时对数据块文件进行校验。
- DataXceiverServer dataXceiverServer:参考http://zy19982004.iteye.com/admin/blogs/1881117。
- LinkedList<Block> receivedBlockList:成功创建的新block。
- LinkedList<String> delHints:可以删掉该数据块的节点。DataXceiver.replaceBlock()和writeBlock成功后会调用DataNode.notifyNamenodeReceivedBlock()方法给这两个List add value.
protected void notifyNamenodeReceivedBlock(Block block, String delHint) { synchronized (receivedBlockList) { synchronized (delHints) { receivedBlockList.add(block); delHints.add(delHint); receivedBlockList.notifyAll(); } } }
三. 代码顺序阅读
- main()函数入口。
public static void main(String args[]) { secureMain(args, null); }
- secureMain():初始化DataNode;加入线程里。
public static void secureMain(String [] args, SecureResources resources) { DataNode datanode = createDataNode(args, null, resources); if (datanode != null) datanode.join(); }
- createDataNode():初始化DataNode;启动该线程。
public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { DataNode dn = instantiateDataNode(args, conf, resources); runDatanodeDaemon(dn); return dn; }
- instantiateDataNode():读取dataDirs;调用makeInstance()。makeInstance():check dataDirs;new DateNode。new DataNode():调用startDataNode()。
public static DataNode instantiateDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { String[] dataDirs = conf.getStrings(DATA_DIR_KEY); return makeInstance(dataDirs, conf, resources); } public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException { DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission); return new DataNode(conf, dirs, resources); } DataNode(final Configuration conf, final AbstractList<File> dataDirs, SecureResources resources) throws IOException { startDataNode(conf, dataDirs, resources); }
- startDataNode():1.根据conf对象初始化各个ip port;2.初始化DataStorage;3.初始化DatanodeRegistration;4.初始化DatanodeProtocol;5.handshake() get version and id info from the name-node,实际调用namenode.versionRequest();6.DataStorage.recoverTransitionRead(...)检查文件系统的状态并做恢复;7.初始化FSDatasetInterface;8.初始化DataBlockScanner;9.创建serverSocket,据此初始化DataXceiverServer;10.HttpServer,配置参数然后启动;11.初始化ipcServer;
- 接3,runDatanodeDaemon():注册DataNode;启动DataNode线程。
public static void runDatanodeDaemon(DataNode dn) throws IOException { if (dn != null) { dn.register(); dn.dataNodeThread = new Thread(dn, dnThreadName); dn.dataNodeThread.setDaemon(true); // needed for JUnit testing dn.dataNodeThread.start(); } } private void register() throws IOException { dnRegistration = namenode.register(dnRegistration); }
- 进入run():启动DataXceiverServer线程;启动ipcServer;startDistributedUpgradeIfNeeded() Start distributed upgrade if it should be initiated by the data-node;offerService()开始工作。
public void run() { LOG.info(dnRegistration + "In DataNode.run, data = " + data); dataXceiverServer.start(); ipcServer.start(); while (shouldRun) { startDistributedUpgradeIfNeeded(); offerService(); }
- offerService():1.定期heartBeatInterval调用namenode.sendHeartbeat()发送DataNode信息的心跳,并执行带回的DatanodeCommand;2.一直调用namenode.blockReceived()向Namenode报告最近接收到的数据块、要删除的多余块副本;3.定期调用namenode.blockReport()发送block report 告诉NameNode此DataNode上的block信息,并执行带回的DatanodeCommand。4.启动blockScannerThread线程。
public void offerService() throws Exception { while (shouldRun) { try { long startTime = now(); if (startTime - lastHeartbeat > heartBeatInterval) { lastHeartbeat = startTime; 1.DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount()); processCommand(cmds); } 2.namenode.blockReceived(dnRegistration, blockArray, delHintArray); blockReceived成功后删除receivedBlockListhe delHints里的数据 if (startTime - lastBlockReport > blockReportInterval) { 3.DatanodeCommand cmd = namenode.blockReport(dnRegistration, BlockListAsLongs.convertToArrayLongs(bReport)); processCommand(cmd); } blockScannerThread = new Daemon(blockScanner); blockScannerThread.start(); } // while (shouldRun) } // offerService }
相关推荐
【Hadoop大数据技术与应用】-HDFS常用方法和MapReduce程序 Hadoop是一个开源的分布式计算框架,它包括两个核心部分:HDFS(Hadoop Distributed File System)和MapReduce。本实验主要围绕HDFS的基本操作和MapReduce...
实验总结强调了不断实践和学习的重要性,通过这些操作,对Hadoop的本地文件操作和DataNode上的数据管理有了更深入的认识,同时也提升了对代码操作的熟练度。教师的批语可能鼓励学生继续深化理论知识与实践经验的结合...
HDFS(Hadoop Distributed File System)是一种分布式文件系统,主要用于存储和管理大规模数据。HDFS 的设计初衷是为了满足高性能、高可靠性和高可扩展性的需求。 HDFS 体系结构 HDFS 的体系结构主要由两个组件...
3. **HDFS初始化**:使用`winutils.exe`初始化HDFS文件系统,创建NameNode和DataNode的数据目录,这通常涉及到创建一些特定的目录结构并设置相应的权限。 4. **配置文件**:修改`conf/core-site.xml`和`conf/hdfs-...
这些工具允许用户在本地执行Hadoop相关的操作,如启动DataNode、NameNode等服务,以及与HDFS交互。 在安装Hadoop 3.3.1 on Windows时,你需要进行以下步骤: 1. **下载并解压**:首先,你需要下载hadoop-3.3.1的...
角色变量hdfs_version - HDFS 版本hdfs_cloudera_distribution - Cloudera 发行版(默认: cdh5.4 ) hdfs_conf_dir - HDFS 的配置目录(默认: /etc/hadoop/conf ) hdfs_namenode - 确定节点是否为 HDFS NameNode ...
### 大数据、Hadoop与HDFS详解 随着信息技术的快速发展和互联网的普及,数据量呈爆炸性增长态势。传统的数据处理工具和技术已无法满足如此大规模数据的存储、管理和分析需求。为此,Apache Hadoop应运而生,它提供...
《Idea Hadoop-HDFS插件详解与应用》 在大数据开发领域,Hadoop作为分布式计算框架的重要代表,其HDFS(Hadoop Distributed File System)是数据存储的核心组件。为了方便开发者在IDEA(IntelliJ IDEA)环境中更加...
首先,我们来看Hadoop的两个核心组件:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一个分布式文件系统,它将大文件分割成多个块,并在集群中的多台机器上存储这些块,提供高容错性和高可用性。...
Hadoop是大数据处理领域的一个关键框架,...通过正确安装、配置和使用这个压缩包中的组件,开发者可以在没有Linux环境的情况下,也能顺利地进行Hadoop相关的工作,这对于学习和理解Hadoop的分布式计算原理非常有帮助。
### HDFS(分布式文件系统) #### HDFS写入剖析: 1. **发请求**:客户端首先向NameNode发起写文件的请求。 2. **检查**:NameNode对客户端的请求进行合法性验证,包括检查文件是否已经存在以及客户端是否有相应的...
### Hadoop-HDFS环境下文件上传与下载操作指南 #### 一、Windows环境下配置Hadoop环境 **1.1 下载Hadoop** 为了在Windows环境下配置Hadoop环境,首先需要下载Hadoop软件包。推荐下载Hadoop 2.7.7版本,可以从清华...
【Hadoop-HDFS概述】 Hadoop-HDFS,全称为Hadoop Distributed File System,是一种分布式文件系统,旨在解决大规模数据存储和处理的问题。随着大数据时代的到来,单个操作系统无法有效地管理和维护海量数据,因此,...
- `hadoop-hdfs`:实现了分布式文件系统HDFS,包括NameNode、DataNode等组件。 - `hadoop-mapreduce`:包含MapReduce框架,包括JobTracker(在2.x版本中被YARN替代)、TaskTracker以及任务执行相关的类。 - `...
- **错误日志分析**:当遇到问题时,检查Hadoop的日志文件,如`logs/hadoop-root-namenode-localhost.out`和`logs/hadoop-root-datanode-localhost.out`,它们会提供错误信息帮助解决问题。 - **防火墙配置**:...