`

第七章:小朱笔记hadoop之源码分析-hdfs分析 第九节:block Recovery过程分析

 
阅读更多

第七章:小朱笔记hadoop之源码分析-hdfs分析

第九节:block Recovery过程分析

Lease Recovery Algorithm lease recovery算法:

1) Namenode retrieves lease information 
        name node查找到lease的信息
2) For each file f in the lease, consider the last block b off
	对于lease中的每一个文件,获取其最后一个block b进行以下处理
2.1)  Get the datanodes which contains b
	获取包含block b 的全部data node
2.2)  Assign one of the datanodes as the primary datanode p
	获取一个data node作为其primary data node
2.3)  p obtains a new generation stamp form the namenode
	primary datanode向 nn获取一个新的generation stamp
2.4)  p get the block info from each datanode
	primary datanode 向每一个dn获取block info
2.5)  p computes the minimum block length
	primary datanode 计算最小块的长度
2.6) p updates the datanodes, which have a valid generation stamp,with the new generation stamp and the minimum block length
	primary datanode 用最小块的长度和新生成的 genetation stamp来更新 dn 
2.7)  p acknowledges the namenode the update results
	primary datanode  ack nn update的结果
2.8)  Namenode updates the BlockInfo
	  nn 更新 block info
2.9)  Namenode removes f from the lease and removes the lease once all files have been removed
	 nn 删除文件 f 的lease
2.10) Namenode commit changes to edit log
	nn向edit log提交 lease 这个change 

      一个client在持有某个文件的Lease情况下,如果写入数据过程中发生宕机,或者其他事故,导致无法继续对文件进行写入。由于该文件的Lease是由namenode来维护的,此时namenode认为该文件正在被该client持有,所以其他client对该文件是不允许进行写入的。
      为了解决上面的问题,namenode中对某个client对应某个文件的Lease是有一个限期的,一旦过了这 个限期,该Lease没有发生任何改变(比如更新时间),没有写入任何数据,那么namenode就认为该lease对应的client发生了异常,需要在 namenode端对这个Lease进行释放,以便其他的client能够对文件进行写入操作。释放Lease的时候会处理正在写入的文件,把该文件的最后一个block和targets datanode数组加入到需要recovery的队列中,进行处理之后等待目标datanode心跳获取该数据。
     datanode获取了需要recovery的block的数据,会遍历targets datanode进行recovery操作,recovery结束会按照最小块的长度进行截取、更新块信息和元信息。

 

第一步:namenode租约检查,准备recovery block 数据

LeaseManager.Monitor.checkLeases 关于租约过程有专门一节讲解。
(1)从namenode内存中找到该filePath对应的文件INode,通常这个时候该INode是一个INodeFileUnderConstruction的实例,表示这个文件是正在被写入,还没有complete的一个文件。
(2)如果该文件的Targets为空,且该文件的block队列为空,表示这个文件是个空文件,那么直接将该文件complete,删除其对应的lease记录,然后返回
(3)如果该文件的Targets为空,且该文件的block队列非空,那么获取该文件的block队列,并找到该队列的最后一个block,将该block的最后一个block对应的 datanode设置为该文件的targets,这个操作的原因在于:由于HDFS的文件只能向最后一个block写入输入,所以lease过期肯定是出 了最后一个block有问题外,其他block应该都是完整的,所以获取最后一个block。而targets表示最后一个block应该保存在哪几个 datanode上,该targets是一个datanode队列,也就是说,namenode知道这最后一个block是在这几台datanode 上,以便向这几个datanode发送block recovery命令
(4)在targets队列中选择一个datanode作为primary datanode,把该文件的最后一个block和targets datanode数组加入到primary datanode的recoverBlocks中。重点关注recoverBlocks这个队列
(5)修改Lease Holder为'NN_Recovery'并续组

 

 
 //源码一:
 if(pendingFile.getTargets()==null||pendingFile.getTargets().length== 0){
      if (pendingFile.getBlocks().length == 0) {
        finalizeINodeFileUnderConstruction(src, pendingFile);
        return;
      }
      // setup the Inode.targets for the last block from the blocksMap
      //最后一个block对应的datanode设置为该文件的targets 
      Block[] blocks = pendingFile.getBlocks();
      Block last = blocks[blocks.length-1];
      DatanodeDescriptor[] targets =  new DatanodeDescriptor[blocksMap.numNodes(last)];
      Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
      for (int i = 0; it != null && it.hasNext(); i++) {
        targets[i] = it.next();
      }
      pendingFile.setTargets(targets);
    }
    
    //指定primary datanode,start lease recovery of the last block for this file.
    pendingFile.assignPrimaryDatanode();
    
    //再分配Lease 注意Holder NN_Recovery
    Lease reassignedLease = reassignLease(lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
    
    //续约
    leaseManager.renewLease(reassignedLease);

   //源码2:
   //将该INodeFileUnderConstruction文件分配给指定的客户端进程,也就是执行租约恢复的操作为该文件初始化租约的恢复的处理(存储选择的主Datanode所激活的块列表) ,该过程会挑选一个目前还活着的DataNode,作为租约的主节点,并把<block,block目标     DataNode数组>加到该DataNode的recoverBlocks队列中;
  void assignPrimaryDatanode() {
    //assign the first alive datanode as the primary datanode
    // 指派第一个活跃的为主Datanode结点  
    if (targets.length == 0) {
      NameNode.stateChangeLog.warn("BLOCK*"
        + " INodeFileUnderConstruction.initLeaseRecovery:"
        + " No blocks found, lease removed.");
    }
    int previous = primaryNodeIndex;  
    //primaryNodeIndex初始值是-1,它用来保证每次找到的primary不在同一个位置
    // 从索引previous开始查找到一个活跃的Datanode进程  
    for(int i = 1; i <= targets.length; i++) {  
      int j = (previous + i)%targets.length;  
      if (targets[j].isAlive) { // 保证第j个Datanode处于活跃状态  

        DatanodeDescriptor primary = targets[primaryNodeIndex = j];  
 
        //把该文件的最后一个block和targets datanode数组加入到 
	  //primary datanode 的recoverBlocks中

        primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets); 
        //存储被主Datanode激活的块,实际存储到该Datanode的块队列中  
        
        NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1] + " recovery started, primary=" + primary);  
        return;  
      }  
    }  
  }

 

第二步:datanode心跳获取recovery block

     在 HDFS中,datanode是会每隔几秒钟向namenode定期的发送心跳,namenode会返回给datanode一个“命令集(cmds)”的,这个命令集就是namenode需要 datanode执行的某些操作,比如
    (1)将该datanode上的某个block拷贝到其他datanode上去的DNA_TRANSFER命令
    (2)将该datanode上的某个block从物理磁盘上删除的DNA_TRANSFER命令
    (3)停止该datanode的DNA_SHUTDOWN命令
    (4)对某个block进行block recovery的DNA_RECOVERBLOCK命令
         

 cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);

 

 

       从上可以看出,从heartbeat的返回命令集中,就包括了对某个block进行recovery的命令。所以,datanode某个block进行 recovery操作的动作,实际上是来自namenode的指令。也就是说,namenode认为这个block需要做recovery了,并且这个 block在某几个datanode上保存,那么namenode就会在这几个datanode的heartbeat发送过来后,给这几个 datanode返回指令集,指令集中就包括对这个block进行recovery的指令。于是datanode接受到这个指令后,对block进行数据 本身的recovery操作。
    注意: BlockCommand数据结构,它存储了需要执行的操作(action)以及这个操作涉及的block和这个block所对应的所有的datanode。

 

public class BlockCommand extends DatanodeCommand {
  		Block blocks[];
  		DatanodeInfo targets[][];
}

 

第三步:datanode执行 block recovery

datanode上最终会调用recoverBlock方法,此时closeFile=true。它是recoverLease发起的,此时要关闭文件,并使得这个文件的block在datanode上的信息一致。

 

Public Daemon recoverBlocks(final Block[] blocks,final DatanodeInfo[][]targets){
    Daemon d = new Daemon(threadGroup, new Runnable() {
      /** Recover a list of blocks. It is run by the primary datanode. */
      public void run() {
        for(int i = 0; i < blocks.length; i++) {
          try {
            logRecoverBlock("NameNode", blocks[i], targets[i]);
            
            recoverBlock(blocks[i], false, targets[i], true);
          } catch (IOException e) {
            LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
          }
        }
      }
    });
    d.start();
    return d;
  }

 (1)由于block recovery 是由primary datanode发起,但该recovery操作需要在三个datanode上对该block进行操作(假设文件副本为3),所以primary datanode接收到命令的时候同时还收到了该block的targets datanode数组(其中就包括该datanode自身)
(2)primary datanode遍历targets datanode数组,对每一个datanode,向其发送一个start block recovery的指令。判断本地调用还是远程RPC,如果是其自身,则直接执行该指令。
(3)start block recovery指令会在datanode的磁盘中找到该block的物理块,并确认该block对应的验证信息和meta信息正确,并返回一个BlockRecord对象,表示这个block正在被recovery。
(4)对每个BlockRecord,查看keepLength标志位是否为true,如果为true,则只recovery blocksize 跟 namenode中记录的blocksize一致的block,否则全部都算。并且block的size为BlockRecord最小的。
(5)对每个物理块,一旦真正开始recovery操作,则进行如下操作:在该datanode上找到该block,同时找到这个block对应的meta文件 (每一个block都对应一个meta文件,用来记录该block的验证码等原信息),更新该block的stamp号(表示该block已经被修改过一 次)

 

(6)如果需要recovery成的block的size 小于实际的block的size,则将实际的block截断成其需要的大小,并更新meta文件和验证信息。

 

第四步:datanode处理recovery结果

primary datanode调用各个BlockRecord对应的datanode进行Block同步,然后向namenode提交块同步信息。

for(BlockRecord r : syncList) {
      try {
        r.datanode.updateBlock(r.info.getBlock(), newblock, closeFile);
        successList.add(r.id);
      } catch (IOException e) {
        InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
            + newblock + ", datanode=" + r.id + ")", e);
      }
    }
 	
namenode.commitBlockSynchronization(block,newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,nlist);

 

(1)updateBlock 更新Block

  updateBlock的最外层是一个死循环,循环的结束条件,是没有任何和这个数据块相关的写线程。每次循环,updateBlock都会去调用一个叫tryUpdateBlock的内部方法。tryUpdateBlock发现已经没有线程在写这个块,就会跟新和这个数据块相关的信息,包括元文件和内存中的映射表volumeMap。如果tryUpdateBlock发现还有活跃的线程和该块关联,那么,updateBlock会试图结束该线程,并等在join上等待。

public void updateBlock(Block oldblock, Block newblock) throws IOException {
    if (oldblock.getBlockId() != newblock.getBlockId()) {
      throw new IOException("Cannot update oldblock (=" + oldblock
          + ") to newblock (=" + newblock + ").");
    }


    // Protect against a straggler updateblock call moving a block backwards
    // in time.
    boolean isValidUpdate =
      (newblock.getGenerationStamp() > oldblock.getGenerationStamp()) ||
      (newblock.getGenerationStamp() == oldblock.getGenerationStamp() &&
       newblock.getNumBytes() == oldblock.getNumBytes());

    if (!isValidUpdate) {
      throw new IOException(
        "Cannot update oldblock=" + oldblock +
        " to newblock=" + newblock + " since generation stamps must " +
        "increase, or else length must not change.");
    }

    
    for(;;) {
      final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
      if (threads == null) {
        return;
      }
     
      interruptAndJoinThreads(threads);
    }
  }

 

 

  /**
   * Try to update an old block to a new block.
   * If there are ongoing create threads running for the old block,
   * the threads will be returned without updating the block. 
   * 用于将旧块截断成新块(调用truncateBlock),并截断相应的元数据文件,以及更新ongoingCreates、volumeMap。
   * @return ongoing create threads if there is any. Otherwise, return null.
   */
  private synchronized List<Thread> tryUpdateBlock(Block oldblock, Block newblock) throws IOException {
    //check ongoing create threads 获取与此块相关的文件及访问这个块的线程
    ArrayList<Thread> activeThreads = getActiveThreads(oldblock);
    if (activeThreads != null) {
      return activeThreads; //如果近期有对此块进行操作,返回存活的操作线程
    }
    //近期无对此块操作的线程,就更新块  
    
    ////获得旧块的文件  
    //No ongoing create threads is alive.  Update block.
    File blockFile = findBlockFile(oldblock.getBlockId());
    if (blockFile == null) {
      throw new IOException("Block " + oldblock + " does not exist.");
    }

    File oldMetaFile = findMetaFile(blockFile);
    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
    
    // First validate the update
    
    //update generation stamp
    //旧块stamp比新块stamp大,不合法  
    if (oldgs > newblock.getGenerationStamp()) {
      throw new IOException("Cannot update block (id=" + newblock.getBlockId()
          + ") generation stamp from " + oldgs
          + " to " + newblock.getGenerationStamp());
    }
    
    //update length
    //新块的大小大于旧块的大小  
    if (newblock.getNumBytes() > oldblock.getNumBytes()) {
      throw new IOException("Cannot update block file (=" + blockFile
          + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());
    }

    // Now perform the update

    //rename meta file to a tmp file
    //旧块元数据文件重命名  
    File tmpMetaFile = new File(oldMetaFile.getParent(),oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
    if (!oldMetaFile.renameTo(tmpMetaFile)){
      throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
    }

    //新块的大小小于旧块的大小 截断旧块和旧块的元数据文件
    if (newblock.getNumBytes() < oldblock.getNumBytes()) {
      truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
    }

    //rename the tmp file to the new meta file (with new generation stamp)
    File newMetaFile = getMetaFile(blockFile, newblock);
    if (!tmpMetaFile.renameTo(newMetaFile)) {
      throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
    }

    updateBlockMap(ongoingCreates, oldblock, newblock);
    updateBlockMap(volumeMap, oldblock, newblock);

    // paranoia! verify that the contents of the stored block 
    // matches the block file on disk.
    validateBlockMetadata(newblock);
    return null;
  }

  //truncateBlock对旧块blockFile和对应的元数据文件metaFile进行截断,截断后旧块长度为newlen(newlen<oldlen)。  
  static void truncateBlock(File blockFile, File metaFile,long oldlen, long newlen) throws IOException {
    if (newlen == oldlen) {
      return;
    }
    if (newlen > oldlen) {
      throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
          + ") to newlen (=" + newlen + ")");
    }

    if (newlen == 0) {
      // Special case for truncating to 0 length, since there's no previous
      // chunk.
      RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
      try {
        //truncate blockFile 
        blockRAF.setLength(newlen);   
      } finally {
        blockRAF.close();
      }
      //update metaFile 
      RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
      try {
        metaRAF.setLength(BlockMetadataHeader.getHeaderSize());
      } finally {
        metaRAF.close();
      }
      return;
    }
    
    //由于只是对就块进行截断,所有新块的最后一个校验和字段可能在旧块中不一样,  
    //所有setLength进行截断后,要读取最后一个校验和字段  
    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 
    int checksumsize = dcs.getChecksumSize();
    int bpc = dcs.getBytesPerChecksum();
    long newChunkCount = (newlen - 1)/bpc + 1;//校验和的段数  
    long newmetalen = BlockMetadataHeader.getHeaderSize() + newChunkCount*checksumsize;//新的校验和文件的长度  
    long lastchunkoffset = (newChunkCount - 1)*bpc;//最后一个校验和字段的偏移位置  
    int lastchunksize = (int)(newlen - lastchunkoffset); //最后一个校验和的开始位置  
    byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 

    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");//对旧块进行读取  
    try {
      //truncate blockFile 
      blockRAF.setLength(newlen);
 
      //read last chunk
      blockRAF.seek(lastchunkoffset);
      blockRAF.readFully(b, 0, lastchunksize);
    } finally {
      blockRAF.close();
    }

    //compute checksum
    dcs.update(b, 0, lastchunksize);
    dcs.writeValue(b, 0, false);

    //update metaFile 
    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
    try {
      metaRAF.setLength(newmetalen);
      metaRAF.seek(newmetalen - checksumsize);
      metaRAF.write(b, 0, checksumsize);
    } finally {
      metaRAF.close();
    }
  }

 

 

(2) commitBlockSynchronization

  参数分别是block,数据块;newgenerationstamp,新的时间戳;newlength,新长度;closeFile,是否关闭文件,deleteblock,是否删除文件;newtargets,新的目标列表。
  处理流程:
  参数检查,获取对应的文件,记为pendingFile;从BlocksMap中删除老的信息;如果deleteblock为true,从pendingFile删除Block记录;否则,更新Block的信息;如果不关闭文件,那么写日志保存更新,返回;最后如果关闭文件的话,调用finalizeINodeFileUnderConstruction。

 

 

分享到:
评论

相关推荐

    hadoop-hdfs-client-2.9.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-client-2.9.1.jar 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-client-2.9.1-javadoc-...

    hadoop-hdfs-client-2.9.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-client-2.9.1.jar; 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-client-2.9.1.pom;...

    hadoop-hdfs-2.7.3-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.5.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.7.3-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.5.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.9.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.9.1.jar 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-2.9.1-javadoc-API文档-中文(简体)版.zip 对应...

    hadoop-hdfs-2.9.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.9.1.jar; 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.9.1.pom; 包含翻译后的API文档:hadoop...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop源码分析-HDFS&MapReduce

    在Hadoop这个分布式计算框架中,HDFS(Hadoop Distributed File System)和MapReduce是两个核心组件,它们共同构建了大数据处理的基础架构。HDFS提供了高容错性的分布式存储,而MapReduce则提供了大规模数据集的并行...

    Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码

    Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】--...

    hadoop-hdfs-2.4.1.jar

    hadoop-hdfs-2.4.1.jar

    hadoop-hdfs-2.7.3

    hadoop-hdfs-2.7.3搭建flume1.7需要用到的包,还有几个包也有提供

    hadoop-hdfs-2.7.7.jar

    flume 想要将数据输出到hdfs,必须要有hadoop相关jar包。本资源是hadoop 2.7.7版本

    hadoop-hdfs-2.2.0.jar

    hadoop-hdfs-2.2.0.jar 点击下载资源即表示您确认该资源不违反资源分享的使用条款

    hadoop-hdfs-test-0.21.0.jar

    hadoop-hdfs-test-0.21.0.jar

Global site tag (gtag.js) - Google Analytics