`
zy19982004
  • 浏览: 662088 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:252025
社区版块
存档分类
最新评论

Hadoop学习二十一:Hadoop-Hdfs DataNode 源码

 
阅读更多

一. DataNode类图

 

 

二. DateNode属性说明

  •  DatanodeProtocol namenode:RPC代理类。RPC.getProxy(nameNodeAddress)得到远程NameNode代理类。后续的versionRequest(),register(),sendHeartbeat(),blockReceived(),blockReport()都是namenode的行为。
  • org.apache.hadoop.ipc.Server ipcServer:RPC Server。conf里配置ipcAddr=0.0.0.0:50020,ipcServer = RPC.getServer(this, ipcAddr.getHostName()...),此DataNode作为RPC Server提供远程服务,提供接口ClientDatanodeProtocol和InterDatanodeProtocol中定义的服务。
  • HttpServer infoServer:a Jetty embedded server to answer http requests。
  • DataStorage storage:参考http://zy19982004.iteye.com/admin/blogs/1878758
  • DatanodeRegistration dnRegistration:DatanodeRegistration class conatins all information the Namenode needs to identify and verify a Datanode when it contacts the Namenode.后续的sendHeartbeat(),blockReceived(),blockReport()方法第一个参数就是DatanodeRegistration 。
  • FSDatasetInterface data:FSDataset的接口,管理着DataNode的所有block。参考http://zy19982004.iteye.com/admin/blogs/1880303
  • DataBlockScanner blockScanner:定时对数据块文件进行校验。
  • DataXceiverServer dataXceiverServer:参考http://zy19982004.iteye.com/admin/blogs/1881117
  • LinkedList<Block> receivedBlockList:成功创建的新block。
  • LinkedList<String> delHints:可以删掉该数据块的节点。DataXceiver.replaceBlock()和writeBlock成功后会调用DataNode.notifyNamenodeReceivedBlock()方法给这两个List add value.
    protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
        synchronized (receivedBlockList) {
          synchronized (delHints) {
            receivedBlockList.add(block);
            delHints.add(delHint);
            receivedBlockList.notifyAll();
          }
        }
      }
     

三. 代码顺序阅读

  1.  main()函数入口。
      public static void main(String args[]) {
        secureMain(args, null);
      }
  2. secureMain():初始化DataNode;加入线程里。
    public static void secureMain(String [] args, SecureResources resources) {
          DataNode datanode = createDataNode(args, null, resources);
          if (datanode != null)
            datanode.join();
          }
  3. createDataNode():初始化DataNode;启动该线程。
      public static DataNode createDataNode(String args[],
                Configuration conf, SecureResources resources) throws IOException {
        DataNode dn = instantiateDataNode(args, conf, resources);
        runDatanodeDaemon(dn);
        return dn;
      }
  4. instantiateDataNode():读取dataDirs;调用makeInstance()。makeInstance():check dataDirs;new DateNode。new DataNode():调用startDataNode()。
    public static DataNode instantiateDataNode(String args[],
                                          Configuration conf, 
                                          SecureResources resources) throws IOException {
            String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
            return makeInstance(dataDirs, conf, resources);
      }
    
    public static DataNode makeInstance(String[] dataDirs, Configuration conf, 
          SecureResources resources) throws IOException {
          DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);
          return new DataNode(conf, dirs, resources);
      
      }
    
    DataNode(final Configuration conf,
               final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
              startDataNode(conf, dataDirs, resources);
          }
  5. startDataNode():1.根据conf对象初始化各个ip port;2.初始化DataStorage;3.初始化DatanodeRegistration;4.初始化DatanodeProtocol;5.handshake() get version and id info from the name-node,实际调用namenode.versionRequest();6.DataStorage.recoverTransitionRead(...)检查文件系统的状态并做恢复;7.初始化FSDatasetInterface;8.初始化DataBlockScanner;9.创建serverSocket,据此初始化DataXceiverServer;10.HttpServer,配置参数然后启动;11.初始化ipcServer;
  6. 接3,runDatanodeDaemon():注册DataNode;启动DataNode线程。
      public static void runDatanodeDaemon(DataNode dn) throws IOException {
        if (dn != null) {
          dn.register();
          dn.dataNodeThread = new Thread(dn, dnThreadName);
          dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
          dn.dataNodeThread.start();
        }
      }
    
    private void register() throws IOException {
            dnRegistration = namenode.register(dnRegistration);
      } 
  7. 进入run():启动DataXceiverServer线程;启动ipcServer;startDistributedUpgradeIfNeeded() Start distributed upgrade if it should be initiated by the data-node;offerService()开始工作。
      public void run() {
        LOG.info(dnRegistration + "In DataNode.run, data = " + data);
    
        dataXceiverServer.start();
        ipcServer.start();
            
        while (shouldRun) {
            startDistributedUpgradeIfNeeded();
            offerService();
            }
  8. offerService():1.定期heartBeatInterval调用namenode.sendHeartbeat()发送DataNode信息的心跳,并执行带回的DatanodeCommand;2.一直调用namenode.blockReceived()向Namenode报告最近接收到的数据块、要删除的多余块副本;3.定期调用namenode.blockReport()发送block report 告诉NameNode此DataNode上的block信息,并执行带回的DatanodeCommand。4.启动blockScannerThread线程。
    public void offerService() throws Exception {
         
    
        while (shouldRun) {
    	      try {
    	        long startTime = now();
    	        if (startTime - lastHeartbeat > heartBeatInterval) {
    	          lastHeartbeat = startTime;
    	          1.DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
    	                                                       data.getCapacity(),
    	                                                       data.getDfsUsed(),
    	                                                       data.getRemaining(),
    	                                                       xmitsInProgress.get(),
    	                                                       getXceiverCount());
    	          processCommand(cmds);
    	        }
    	            
    	        2.namenode.blockReceived(dnRegistration, blockArray, delHintArray);
                  blockReceived成功后删除receivedBlockListhe delHints里的数据	
    	        if (startTime - lastBlockReport > blockReportInterval) {
    	            3.DatanodeCommand cmd = namenode.blockReport(dnRegistration,
    	                    BlockListAsLongs.convertToArrayLongs(bReport));
    	            processCommand(cmd);
    	        }
    
                             blockScannerThread = new Daemon(blockScanner);
                             blockScannerThread.start();
    	    } // while (shouldRun)
        } // offerService
      } 
分享到:
评论

相关推荐

    《Hadoop大数据技术与应用》-HDFS常用方法和MapReduce程序.pdf

    【Hadoop大数据技术与应用】-HDFS常用方法和MapReduce程序 Hadoop是一个开源的分布式计算框架,它包括两个核心部分:HDFS(Hadoop Distributed File System)和MapReduce。本实验主要围绕HDFS的基本操作和MapReduce...

    《Hadoop大数据技术与应用》-HDFS常用方法和MapReduce程序.docx

    实验总结强调了不断实践和学习的重要性,通过这些操作,对Hadoop的本地文件操作和DataNode上的数据管理有了更深入的认识,同时也提升了对代码操作的熟练度。教师的批语可能鼓励学生继续深化理论知识与实践经验的结合...

    java-Hdfs体系结构与基本概念

    HDFS(Hadoop Distributed File System)是一种分布式文件系统,主要用于存储和管理大规模数据。HDFS 的设计初衷是为了满足高性能、高可靠性和高可扩展性的需求。 HDFS 体系结构 HDFS 的体系结构主要由两个组件...

    hadoop插件apache-hadoop-3.1.0-winutils-master.zip

    3. **HDFS初始化**:使用`winutils.exe`初始化HDFS文件系统,创建NameNode和DataNode的数据目录,这通常涉及到创建一些特定的目录结构并设置相应的权限。 4. **配置文件**:修改`conf/core-site.xml`和`conf/hdfs-...

    hadoop-3.3.1 windows + apache-hadoop-3.1.0-winutils-master.zip

    这些工具允许用户在本地执行Hadoop相关的操作,如启动DataNode、NameNode等服务,以及与HDFS交互。 在安装Hadoop 3.3.1 on Windows时,你需要进行以下步骤: 1. **下载并解压**:首先,你需要下载hadoop-3.3.1的...

    ansible-hdfs:用于安装 Cloudera HDFS 的 Ansible 角色

    角色变量hdfs_version - HDFS 版本hdfs_cloudera_distribution - Cloudera 发行版(默认: cdh5.4 ) hdfs_conf_dir - HDFS 的配置目录(默认: /etc/hadoop/conf ) hdfs_namenode - 确定节点是否为 HDFS NameNode ...

    大数据--Hadoop HDFS

    ### 大数据、Hadoop与HDFS详解 随着信息技术的快速发展和互联网的普及,数据量呈爆炸性增长态势。传统的数据处理工具和技术已无法满足如此大规模数据的存储、管理和分析需求。为此,Apache Hadoop应运而生,它提供...

    idea hadoop-hdfs插件

    《Idea Hadoop-HDFS插件详解与应用》 在大数据开发领域,Hadoop作为分布式计算框架的重要代表,其HDFS(Hadoop Distributed File System)是数据存储的核心组件。为了方便开发者在IDEA(IntelliJ IDEA)环境中更加...

    hadoop安装包centos6.5-hadoop-2.6.4.tar.gz

    首先,我们来看Hadoop的两个核心组件:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一个分布式文件系统,它将大文件分割成多个块,并在集群中的多台机器上存储这些块,提供高容错性和高可用性。...

    hadoop-common-2.6.0-bin-master.zip

    Hadoop是大数据处理领域的一个关键框架,...通过正确安装、配置和使用这个压缩包中的组件,开发者可以在没有Linux环境的情况下,也能顺利地进行Hadoop相关的工作,这对于学习和理解Hadoop的分布式计算原理非常有帮助。

    hadoop-hdfs.pdf

    ### HDFS(分布式文件系统) #### HDFS写入剖析: 1. **发请求**:客户端首先向NameNode发起写文件的请求。 2. **检查**:NameNode对客户端的请求进行合法性验证,包括检查文件是否已经存在以及客户端是否有相应的...

    Hadoop-hdfs下载

    ### Hadoop-HDFS环境下文件上传与下载操作指南 #### 一、Windows环境下配置Hadoop环境 **1.1 下载Hadoop** 为了在Windows环境下配置Hadoop环境,首先需要下载Hadoop软件包。推荐下载Hadoop 2.7.7版本,可以从清华...

    Hadoop-HDFS.docx

    【Hadoop-HDFS概述】 Hadoop-HDFS,全称为Hadoop Distributed File System,是一种分布式文件系统,旨在解决大规模数据存储和处理的问题。随着大数据时代的到来,单个操作系统无法有效地管理和维护海量数据,因此,...

    hadoop-2.5.0-cdh5.3.6-src.tar.gz

    - `hadoop-hdfs`:实现了分布式文件系统HDFS,包括NameNode、DataNode等组件。 - `hadoop-mapreduce`:包含MapReduce框架,包括JobTracker(在2.x版本中被YARN替代)、TaskTracker以及任务执行相关的类。 - `...

    hadoop-common-2.7.3-bin-master

    - **错误日志分析**:当遇到问题时,检查Hadoop的日志文件,如`logs/hadoop-root-namenode-localhost.out`和`logs/hadoop-root-datanode-localhost.out`,它们会提供错误信息帮助解决问题。 - **防火墙配置**:...

Global site tag (gtag.js) - Google Analytics