`
zhangjun5965
  • 浏览: 12273 次
社区版块
存档分类
最新评论

hadoop2.7.3源码解析之datanode注册和心跳机制

阅读更多

 http://zhangjun5965.iteye.com/admin/blogs/2386384

datanode注册和心跳

在hadoop启动的时候,正常的流程是先启动namenoe,然后启动datanode,因为namenode要接受datanode的注册,datanode的注册和心跳是在其启动的时候就开始了,入口方法自然是datanode的main方法。

通过跟踪代码发现在datanode的构造方法里,初始化了BlockPoolManager对象,通过其 blockPoolManager.refreshNamenodes(conf);从配置文件中获取该datanode相关的namenode信息,然后向其发生注册和心跳信息。

具体的是BlockPoolManager里面的startAll()方法,通过startAll方法,会将datanode上面的所有BPOfferService启动.

synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }

通过BPOfferService的start方法循环启动BPServiceActor线程,以便BPServiceActor向其对应的namenode发送注册和心跳消息。

//This must be called only by blockPoolManager
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }

具体的实现方法自然在BPServiceActor的run方法中。

/**
   * 无论发生任何异常,都不会停止offerService方法,除非shouldRun 或者shouldServiceRun返回false
   * No matter what kind of exception we get, keep retrying to offerService().
   * That's the loop that connects to the NameNode and provides basic DataNode
   * functionality.
   *
   * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
   * happen either at shutdown or due to refreshNamenodes.
   */
  @Override
  public void run() {
    LOG.info(this + " starting to offer service");

    try {
      while (true) {
        // init stuff
        try {
          // setup storage
          //连接到namenode,注册datanode
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
                .......
        }
      }

      runningState = RunningState.RUNNING;

      while (shouldRun()) {
        try {
          //心跳报告
          offerService();
        } catch (Exception ex) {
           .....
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      ...
    } finally {
     ....
    }
  }

datanode注册

datanode端的注册相对来说比较简单,通过跟踪connectToNNAndHandshake方法,最后调用的是DatanodeProtocolServerSideTranslatorPB.registerDatanode(RpcController, RegisterDatanodeRequestProto)方法。

在这里构造了一个DatanodeRegistration对象作为参数,里面包含了namenode需要验证datanode的一些基本信息。

最后通过datanode和namenode直接交互的协议DatanodeProtocol接口的registerDatanode方法向namenode发送rpc请求来注册datanode。

该方法最后将datanode注册namenode之后返回的结果处理后返回。

@Override
  public RegisterDatanodeResponseProto registerDatanode(
      RpcController controller, RegisterDatanodeRequestProto request)
      throws ServiceException {
    DatanodeRegistration registration = PBHelper.convert(request
        .getRegistration());
    DatanodeRegistration registrationResp;
    try {
      registrationResp = impl.registerDatanode(registration);
    } catch (IOException e) {
      throw new ServiceException(e);
    }
    return RegisterDatanodeResponseProto.newBuilder()
        .setRegistration(PBHelper.convert(registrationResp)).build();
  }

datanode心跳

datanode的心跳操作主要是在offerService方法中,这个方法会一直运行下去直到shouldRun返回false。
心跳操作,首先向namenode发送心跳的请求,然后根据返回的结果更新一些信息,然后处理从namenode带回来的各种命令(DatanodeCommand数组)

发送心跳的方法是sendHeartBeat,最终调用了DatanodeProtocolServerSideTranslatorPB类的sendHeartbeat(RpcController, HeartbeatRequestProto)方法来发送心跳的请求。

@Override
  public HeartbeatResponseProto sendHeartbeat(RpcController controller,
      HeartbeatRequestProto request) throws ServiceException {
    HeartbeatResponse response;
    try {
        ......
      //发送心跳
      response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
          report, request.getCacheCapacity(), request.getCacheUsed(),
          request.getXmitsInProgress(),
          request.getXceiverCount(), request.getFailedVolumes(),
          volumeFailureSummary);
    } catch (IOException e) {
      throw new ServiceException(e);
    }
    
    //返回命令
    HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
        .newBuilder();
    DatanodeCommand[] cmds = response.getCommands();
    .............
  }

具体的发送心跳的协议我们来看下DatanodeProtocol类的sendHeartbeat方法。

/**
   *
   * sendHeartbeat方法告诉namenode这个datanode还活着,当然也包含一些状态的信息. namenode也会通过心跳信息给datanode发送一些命令,通过DatanodeCommand对象来封装命令。通过这些DatanodeCommand命令,DataNode做一些删除本地无效的块、或者将本地的块复制到其他的datanode的操作。
   *
   * sendHeartbeat() tells the NameNode that the DataNode is still
   * alive and well.  Includes some status info, too. 
   * It also gives the NameNode a chance to return 
   * an array of "DatanodeCommand" objects in HeartbeatResponse.
   * A DatanodeCommand tells the DataNode to invalidate local block(s), 
   * or to copy them to other DataNodes, etc.
   * @param registration datanode registration information datanode的注册信息
   * @param reports utilization report per storage datanode上每个存储的利用率报告(datanode可以配置多个存储目录,这些存储目录可以是异构的,如内存、disk、ssd等)
   * @param xmitsInProgress number of transfers from this datanode to others
   * @param xceiverCount number of active transceiver threads
   * @param failedVolumes number of failed volumes
   * @param volumeFailureSummary info about volume failures
   * @throws IOException on error
   */
  @Idempotent
  public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
                                       StorageReport[] reports,
                                       long dnCacheCapacity,
                                       long dnCacheUsed,
                                       int xmitsInProgress,
                                       int xceiverCount,
                                       int failedVolumes,
                                       VolumeFailureSummary volumeFailureSummary)
      throws IOException;

namenode接收注册和心跳信息

DatanodeManager简单介绍

首先介绍下DatanodeManager中的几个重要变量

/**
   * 
   * 
   * datanodeMap这个map主要是存储了StorageID到DatanodeDescriptor的映射关系,如注释所说,datanode向namenode注册的时候分为三种情况。
   * 
   *  1.如果是以一个新的storage id注册,直接放到map里。
   *  2.现有节点重复注册,这个时候用新的替换旧的就行
   *  3.一个已经存在的datanode以一个不同的storage id来注册
   * 
   * Stores the datanode -> block map.  
   * <p>
   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
   * storage id. In order to keep the storage map consistent it tracks 
   * all storages ever registered with the namenode.
   * A descriptor corresponding to a specific storage id can be
   * <ul> 
   * <li>added to the map if it is a new storage id;</li>
   * <li>updated with a new datanode started as a replacement for the old one 
   * with the same storage id; and </li>
   * <li>removed if and only if an existing datanode is restarted to serve a
   * different storage id.</li>
   * </ul> <br> 
   * <p>
   * Mapping: StorageID -> DatanodeDescriptor
   */
  private final NavigableMap<String, DatanodeDescriptor> datanodeMap
      = new TreeMap<String, DatanodeDescriptor>();

  /**
   * 集群的网络结构
   * Cluster network topology
   */
  private final NetworkTopology networktopology;

  /**
   *  host和DatanodeDescriptor和映射,因为一个节点上可能会有多个datanode。
   *  所以在Host2NodesMap内部其实是String到DatanodeDescriptor[]的映射,
   *  这样的话对这个节点上的某一个datanode进行增删改查操作一个普通的map就无能为力了,
   *  Host2NodesMap主要就是对这些增删改查操作做了一下封装
   *  Host names to datanode descriptors mapping.
   */
  private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();

namednoe接收注册的信息

不管是注册和心跳,datanode都是通过rpc调用了namenode中的同名方法,具体的实现是在NameNodeRpcServe中。

在registerDatanode方法中,调用了FSNamesystem的registerDatanode方法,最终的处理方法是在DatanodeManager.registerDatanode(DatanodeRegistration)中。

首先通过下面的两行代码获取了注册的datanode在datanodemanage中的两个map中的信息。

//这里用 nodeS表示从datanodeMap中获取的datanode信息
DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
//用nodeN表示从host2DatanodeMap获取的信息
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(nodeReg.getIpAddr(), nodeReg.getXferPort());

接下来对datanodemanage中datanodeMap的注释中说的的三种情况 分别进行处理

//此情况为数据节点存在,但是使用了新的存储ID
      if (nodeN != null && nodeN != nodeS) {
        NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
        // nodeN previously served a different data storage, 
        // which is not served by anybody anymore.
        //移除
        removeDatanode(nodeN);
        // physically remove node from datanodeMap
        //物理层面的移除,包含移除这个datanode下面的数据块等
        wipeDatanode(nodeN);
        nodeN = null;
      }

      //重复注册的情况,主要就是更新信息
      if (nodeS != null) {
            ................
          nodeS.updateRegInfo(nodeReg);

          nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
          nodeS.setDisallowed(false); // Node is in the include list

          //重新解析网络的位置信息
          // resolve network location
          if(this.rejectUnresolvedTopologyDN) {
            nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
            nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
          } else {
            nodeS.setNetworkLocation(
                resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
            nodeS.setDependentHostNames(
                getNetworkDependenciesWithDefault(nodeS));
          }
          getNetworkTopology().add(nodeS);
            
          // also treat the registration message as a heartbeat
          heartbeatManager.register(nodeS);
          incrementVersionCount(nodeS.getSoftwareVersion());
          startDecommissioningIfExcluded(nodeS);
          success = true;
         ...........................
      }



    //接下来处理从未注册的过的新节点注册的情况
    
    
            // resolve network location
            //解析网络信息,将其加入集群的网络拓扑中
        if(this.rejectUnresolvedTopologyDN) {
          nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
          nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
        } else {
          nodeDescr.setNetworkLocation(
              resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
          nodeDescr.setDependentHostNames(
              getNetworkDependenciesWithDefault(nodeDescr));
        }
        networktopology.add(nodeDescr);
        nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
  
        // register new datanode
        addDatanode(nodeDescr);
        // also treat the registration message as a heartbeat
        // no need to update its timestamp
        // because its is done when the descriptor is created
        heartbeatManager.addDatanode(nodeDescr);
        incrementVersionCount(nodeReg.getSoftwareVersion());
        startDecommissioningIfExcluded(nodeDescr);
        success = true;

上述对于三种注册的情况分别进行了处理,针对新的节点注册的情况,最终调用了addDatanode方法进行注册,主要就是在那两个map中添加相应的datanode信息,以及将datanode加到网络拓扑中。

/** Add a datanode. */
  void addDatanode(final DatanodeDescriptor node) {
    // To keep host2DatanodeMap consistent with datanodeMap,
    // remove  from host2DatanodeMap the datanodeDescriptor removed
    // from datanodeMap before adding node to host2DatanodeMap.
    
    //加到datanodeMap和 host2DatanodeMap中
    synchronized(datanodeMap) {
      host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
    }

    //加到网络中
    networktopology.add(node); // may throw InvalidTopologyException
    host2DatanodeMap.add(node);
    checkIfClusterIsNowMultiRack(node);

    if (LOG.isDebugEnabled()) {
      LOG.debug(getClass().getSimpleName() + ".addDatanode: "
          + "node " + node + " is added to datanodeMap.");
    }
  }

namenode 接收心跳信息

namenode处理心跳信息是在和datanode同名的方法sendHeartbeat中,最终的处理方法是DatanodeManager.handleHeartbeat方法。

心跳的具体流程如下:

1.先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
2.判断是否注册过,如果没注册过,直接返回注册命令
3.更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
4.检查是否处于安全模式 5.检查租约情况
6.生成复制的命令
7.生成删除的命令
8.生成缓存相关的命令
9.生成带宽相关的命令
10.返回所有的命令

相关的代码如下:

/** Handle heartbeat from datanodes. */
  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary) throws IOException {
    synchronized (heartbeatManager) {
      synchronized (datanodeMap) {
        DatanodeDescriptor nodeinfo = null;
        try {
          //获取datanode的信息
          nodeinfo = getDatanode(nodeReg);
        } catch(UnregisteredNodeException e) {
          return new DatanodeCommand[]{RegisterCommand.REGISTER};
        }
        //是否允许连接
        // Check if this datanode should actually be shutdown instead. 
        if (nodeinfo != null && nodeinfo.isDisallowed()) {
          setDatanodeDead(nodeinfo);
          throw new DisallowedDatanodeException(nodeinfo);
        }

        //检查是否注册过
        if (nodeinfo == null || !nodeinfo.isRegistered()) {
          return new DatanodeCommand[]{RegisterCommand.REGISTER};
        }

        //更新datanode的信息,如使用空间,剩余空间等
        heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                         cacheCapacity, cacheUsed,
                                         xceiverCount, failedVolumes,
                                         volumeFailureSummary);

        //是否处于安全模式
        // If we are in safemode, do not send back any recovery / replication
        // requests. Don't even drain the existing queue of work.
        if(namesystem.isInSafeMode()) {
          return new DatanodeCommand[0];
        }

         //检查租约情况
        //check lease recovery
        BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
        if (blocks != null) {
          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
              blocks.length);
           .................................
          return new DatanodeCommand[] { brCommand };
        }

       
        final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
        //生成复制命令
        //check pending replication
        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
              maxTransfers);
        if (pendingList != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
              pendingList));
        }
        //检查无效的数据块,生成删除命令
        //check block invalidation
        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
        if (blks != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
              blockPoolId, blks));
        }
        //生成缓存相关的命令
        boolean sendingCachingCommands = false;
        long nowMs = monotonicNow();
        if (shouldSendCachingCommands && 
            ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
                timeBetweenResendingCachingDirectivesMs)) {
          DatanodeCommand pendingCacheCommand =
              getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
                DatanodeProtocol.DNA_CACHE, blockPoolId);
          if (pendingCacheCommand != null) {
            cmds.add(pendingCacheCommand);
            sendingCachingCommands = true;
          }
          DatanodeCommand pendingUncacheCommand =
              getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
                DatanodeProtocol.DNA_UNCACHE, blockPoolId);
          if (pendingUncacheCommand != null) {
            cmds.add(pendingUncacheCommand);
            sendingCachingCommands = true;
          }
          if (sendingCachingCommands) {
            nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
          }
        }

        blockManager.addKeyUpdateCommand(cmds, nodeinfo);
         //生成带宽相关的命令
        // check for balancer bandwidth update
        if (nodeinfo.getBalancerBandwidth() > 0) {
          cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
          // set back to 0 to indicate that datanode has been sent the new value
          nodeinfo.setBalancerBandwidth(0);
        }

        //返回所有的命令
        if (!cmds.isEmpty()) {
          return cmds.toArray(new DatanodeCommand[cmds.size()]);
        }
      }
    }

    return new DatanodeCommand[0];
  }
分享到:
评论

相关推荐

    hadoop2.7.3源码包,hadoop2.7.3zip源码包

    Hadoop 2.7.3是其一个稳定版本,提供了大量的功能改进和优化。这个源码包是针对这个特定版本的,包含了所有相关的Java源代码,使得开发者可以深入理解Hadoop的工作原理,进行定制化开发或者调试。 Hadoop主要由两个...

    Hadoop2.7.3安装文档

    ### Hadoop2.7.3安装指南 #### 一、Hadoop下载与环境准备 - **下载地址**:用户可以从Apache官方站点下载Hadoop 2.7.3版本的安装包,具体链接为:[http://hadoop.apache.org/releases.html]...

    Hadoop2.7.3源码Eclipse工程

    【标题】"Hadoop2.7.3源码Eclipse工程"揭示了这个压缩包包含的是Hadoop 2.7.3版本的源代码,并且是为Eclipse IDE准备的项目工程,便于开发者在Eclipse环境中进行源码级别的学习、调试和开发。 【描述】中的信息说明...

    hadoop-2.7.3源码和安装包.zip

    通过深入学习和实践这个Hadoop 2.7.3源码和安装包,你可以全面掌握Hadoop的内部机制,提升大数据处理和分布式计算能力。同时,这也是一个很好的起点,可以进一步研究其他大数据技术,如Spark、Flink等。

    hadoop2.7.3的源码包

    Hadoop 2.7.3 是一个开源框架,主要用于分布式存储和计算,是大数据处理领域的重要组成部分。这个源码包提供了Hadoop的核心组件,包括HDFS(Hadoop Distributed File System)、MapReduce以及YARN(Yet Another ...

    Hadoop2.7.3 Window10 hadoop.dll winutils.exe

    在本文中,我们将深入探讨如何在Windows 10操作系统中使用Hadoop 2.7.3版本进行开发,特别关注“hadoop.dll”和“winutils.exe”这两个关键组件。Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在...

    hadoop-2.7.3.tar.gz 下载 hadoop tar 包下载

    解压后,用户需要根据自己的系统环境进行适当的配置,包括设置Hadoop的环境变量(如HADOOP_HOME)、配置HDFS的namenode和datanode、以及MapReduce的jobtracker和tasktracker。 在Hadoop的配置中,需要修改`core-...

    hadoop2.7.3版本 windows下安装步骤和配置文件(不用cywin)

    在Windows系统上安装Hadoop 2.7.3版本是一项技术性的任务,尤其当避免使用像Cygwin这样的模拟Linux环境时。以下是一份详细的安装步骤和配置指南: 1. **下载Hadoop**: 首先,你需要从Apache官方网站下载Hadoop ...

    hadoop2.7.3_windows安装附件

    总的来说,安装Hadoop 2.7.3在Windows上虽然比在Linux上复杂,但通过理解和解决上述问题,你将能够成功搭建一个本地Hadoop开发环境。在学习和实践中,不断熟悉Hadoop的配置和操作,对于理解分布式计算原理和提升...

    windows-hadoop-2.7.3

    7. **启动Hadoop**:使用`start-dfs.cmd`和`start-yarn.cmd`命令启动Hadoop的DataNode、NameNode和ResourceManager等服务。 8. **测试运行**:可以通过`hadoop fs -ls /`命令检查HDFS是否正常工作,或者编写简单的...

    hadoop2.7.3 在windows下需要的hadoop.dll winutils.exe等文件(bin目录)

    以上就是在Windows环境下配置和使用Hadoop 2.7.3所需的关键步骤和知识点,包括`hadoop.dll`和`winutils.exe`的作用以及如何正确设置它们。通过这些步骤,你应该能够在Windows系统上建立一个基本的Hadoop开发或测试...

    hadoop 2.7.3 + Windows安装替换bin目录文件

    在本文中,我们将深入探讨如何在Windows环境下安装和配置Hadoop 2.7.3,特别关注替换bin目录中的文件这一关键步骤。Hadoop是一个开源的分布式计算框架,由Apache软件基金会开发,它允许在廉价硬件上处理和存储大量...

    hadoop-common-2.7.3-bin-master-windows

    1. 启动Hadoop守护进程,通常包括NameNode、DataNode和YARN的ResourceManager、NodeManager。 2. 配置HDFS的目录结构,如使用`winutils.exe fs -mkdir /user`创建用户目录。 3. 通过`hadoop fs -put`命令将本地文件...

    windows下hadoop2.7.3环境问题的解决(含说明)

    Hadoop 2.7.3是其稳定版本之一,提供了许多性能优化和功能改进。在Windows系统中搭建Hadoop,需要注意一些关键点: 1. **环境变量配置**:安装Hadoop前,需设置JAVA_HOME环境变量,指向Java开发工具的安装路径。...

    hadoop 2.7.3 32位

    7. **启动Hadoop**:依次启动DataNode、TaskTracker、NameNode和ResourceManager等服务。 8. **测试Hadoop**:运行`hadoop fs -ls /`命令检查HDFS是否正常工作,或者编写简单的MapReduce程序验证集群的计算功能。 ...

    mac hadoop安装hadoop 2.7.3

    6. **启动Hadoop**:启动Hadoop的各个服务,如DataNode、NameNode、ResourceManager等,一般通过`start-dfs.sh`和`start-yarn.sh`脚本完成。 接下来是Eclipse的集成,这有助于在IDE中开发和调试Hadoop程序。`...

    第一步-hadoop-hadoop-2.7.3在centos7上部署安装(单机版).zip

    在本教程中,我们将深入探讨如何在CentOS7操作系统上部署和安装Hadoop 2.7.3的单机版本,以及如何进一步搭建HBase单机版和Pinpoint监控工具,与SpringBoot应用进行整合。这是一个针对初学者的指南,旨在帮助理解...

    hadoop2.7.3在windows编译的bin文件夹

    这些配置文件定义了Hadoop集群的设置,如NameNode和DataNode的位置,以及内存和磁盘使用。 **5. 启动和停止Hadoop** 使用bin目录中的`start-dfs.sh`和`start-yarn.sh`命令来启动Hadoop服务,`stop-dfs.sh`和`stop-...

    hadoop2.7.3windows编译包,winutils.exe,bin包所有文件

    7. **启动Hadoop服务**: 运行`start-dfs.cmd`启动Hadoop的数据节点(datanode)和服务节点(namenode),然后运行`start-yarn.cmd`启动YARN资源管理器。 8. **测试Hadoop安装**: 使用`hadoop fs -ls /`命令检查HDFS...

Global site tag (gtag.js) - Google Analytics