第七章:小朱笔记hadoop之源码分析-hdfs分析
第九节:block Recovery过程分析
Lease Recovery Algorithm lease recovery算法:
1) Namenode retrieves lease information name node查找到lease的信息 2) For each file f in the lease, consider the last block b off 对于lease中的每一个文件,获取其最后一个block b进行以下处理 2.1) Get the datanodes which contains b 获取包含block b 的全部data node 2.2) Assign one of the datanodes as the primary datanode p 获取一个data node作为其primary data node 2.3) p obtains a new generation stamp form the namenode primary datanode向 nn获取一个新的generation stamp 2.4) p get the block info from each datanode primary datanode 向每一个dn获取block info 2.5) p computes the minimum block length primary datanode 计算最小块的长度 2.6) p updates the datanodes, which have a valid generation stamp,with the new generation stamp and the minimum block length primary datanode 用最小块的长度和新生成的 genetation stamp来更新 dn 2.7) p acknowledges the namenode the update results primary datanode ack nn update的结果 2.8) Namenode updates the BlockInfo nn 更新 block info 2.9) Namenode removes f from the lease and removes the lease once all files have been removed nn 删除文件 f 的lease 2.10) Namenode commit changes to edit log nn向edit log提交 lease 这个change
一个client在持有某个文件的Lease情况下,如果写入数据过程中发生宕机,或者其他事故,导致无法继续对文件进行写入。由于该文件的Lease是由namenode来维护的,此时namenode认为该文件正在被该client持有,所以其他client对该文件是不允许进行写入的。
为了解决上面的问题,namenode中对某个client对应某个文件的Lease是有一个限期的,一旦过了这 个限期,该Lease没有发生任何改变(比如更新时间),没有写入任何数据,那么namenode就认为该lease对应的client发生了异常,需要在 namenode端对这个Lease进行释放,以便其他的client能够对文件进行写入操作。释放Lease的时候会处理正在写入的文件,把该文件的最后一个block和targets datanode数组加入到需要recovery的队列中,进行处理之后等待目标datanode心跳获取该数据。
datanode获取了需要recovery的block的数据,会遍历targets datanode进行recovery操作,recovery结束会按照最小块的长度进行截取、更新块信息和元信息。
第一步:namenode租约检查,准备recovery block 数据
LeaseManager.Monitor.checkLeases 关于租约过程有专门一节讲解。
(1)从namenode内存中找到该filePath对应的文件INode,通常这个时候该INode是一个INodeFileUnderConstruction的实例,表示这个文件是正在被写入,还没有complete的一个文件。
(2)如果该文件的Targets为空,且该文件的block队列为空,表示这个文件是个空文件,那么直接将该文件complete,删除其对应的lease记录,然后返回
(3)如果该文件的Targets为空,且该文件的block队列非空,那么获取该文件的block队列,并找到该队列的最后一个block,将该block的最后一个block对应的 datanode设置为该文件的targets,这个操作的原因在于:由于HDFS的文件只能向最后一个block写入输入,所以lease过期肯定是出 了最后一个block有问题外,其他block应该都是完整的,所以获取最后一个block。而targets表示最后一个block应该保存在哪几个 datanode上,该targets是一个datanode队列,也就是说,namenode知道这最后一个block是在这几台datanode 上,以便向这几个datanode发送block recovery命令
(4)在targets队列中选择一个datanode作为primary datanode,把该文件的最后一个block和targets datanode数组加入到primary datanode的recoverBlocks中。重点关注recoverBlocks这个队列
(5)修改Lease Holder为'NN_Recovery'并续组
//源码一: if(pendingFile.getTargets()==null||pendingFile.getTargets().length== 0){ if (pendingFile.getBlocks().length == 0) { finalizeINodeFileUnderConstruction(src, pendingFile); return; } // setup the Inode.targets for the last block from the blocksMap //最后一个block对应的datanode设置为该文件的targets Block[] blocks = pendingFile.getBlocks(); Block last = blocks[blocks.length-1]; DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)]; Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last); for (int i = 0; it != null && it.hasNext(); i++) { targets[i] = it.next(); } pendingFile.setTargets(targets); } //指定primary datanode,start lease recovery of the last block for this file. pendingFile.assignPrimaryDatanode(); //再分配Lease 注意Holder NN_Recovery Lease reassignedLease = reassignLease(lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile); //续约 leaseManager.renewLease(reassignedLease); //源码2: //将该INodeFileUnderConstruction文件分配给指定的客户端进程,也就是执行租约恢复的操作为该文件初始化租约的恢复的处理(存储选择的主Datanode所激活的块列表) ,该过程会挑选一个目前还活着的DataNode,作为租约的主节点,并把<block,block目标 DataNode数组>加到该DataNode的recoverBlocks队列中; void assignPrimaryDatanode() { //assign the first alive datanode as the primary datanode // 指派第一个活跃的为主Datanode结点 if (targets.length == 0) { NameNode.stateChangeLog.warn("BLOCK*" + " INodeFileUnderConstruction.initLeaseRecovery:" + " No blocks found, lease removed."); } int previous = primaryNodeIndex; //primaryNodeIndex初始值是-1,它用来保证每次找到的primary不在同一个位置 // 从索引previous开始查找到一个活跃的Datanode进程 for(int i = 1; i <= targets.length; i++) { int j = (previous + i)%targets.length; if (targets[j].isAlive) { // 保证第j个Datanode处于活跃状态 DatanodeDescriptor primary = targets[primaryNodeIndex = j]; //把该文件的最后一个block和targets datanode数组加入到 //primary datanode 的recoverBlocks中 primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets); //存储被主Datanode激活的块,实际存储到该Datanode的块队列中 NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1] + " recovery started, primary=" + primary); return; } } }
第二步:datanode心跳获取recovery block
在 HDFS中,datanode是会每隔几秒钟向namenode定期的发送心跳,namenode会返回给datanode一个“命令集(cmds)”的,这个命令集就是namenode需要 datanode执行的某些操作,比如
(1)将该datanode上的某个block拷贝到其他datanode上去的DNA_TRANSFER命令
(2)将该datanode上的某个block从物理磁盘上删除的DNA_TRANSFER命令
(3)停止该datanode的DNA_SHUTDOWN命令
(4)对某个block进行block recovery的DNA_RECOVERBLOCK命令
cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
从上可以看出,从heartbeat的返回命令集中,就包括了对某个block进行recovery的命令。所以,datanode某个block进行 recovery操作的动作,实际上是来自namenode的指令。也就是说,namenode认为这个block需要做recovery了,并且这个 block在某几个datanode上保存,那么namenode就会在这几个datanode的heartbeat发送过来后,给这几个 datanode返回指令集,指令集中就包括对这个block进行recovery的指令。于是datanode接受到这个指令后,对block进行数据 本身的recovery操作。
注意: BlockCommand数据结构,它存储了需要执行的操作(action)以及这个操作涉及的block和这个block所对应的所有的datanode。
public class BlockCommand extends DatanodeCommand { Block blocks[]; DatanodeInfo targets[][]; }
第三步:datanode执行 block recovery
datanode上最终会调用recoverBlock方法,此时closeFile=true。它是recoverLease发起的,此时要关闭文件,并使得这个文件的block在datanode上的信息一致。
Public Daemon recoverBlocks(final Block[] blocks,final DatanodeInfo[][]targets){ Daemon d = new Daemon(threadGroup, new Runnable() { /** Recover a list of blocks. It is run by the primary datanode. */ public void run() { for(int i = 0; i < blocks.length; i++) { try { logRecoverBlock("NameNode", blocks[i], targets[i]); recoverBlock(blocks[i], false, targets[i], true); } catch (IOException e) { LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e); } } } }); d.start(); return d; }
(1)由于block recovery 是由primary datanode发起,但该recovery操作需要在三个datanode上对该block进行操作(假设文件副本为3),所以primary datanode接收到命令的时候同时还收到了该block的targets datanode数组(其中就包括该datanode自身)
(2)primary datanode遍历targets datanode数组,对每一个datanode,向其发送一个start block recovery的指令。判断本地调用还是远程RPC,如果是其自身,则直接执行该指令。
(3)start block recovery指令会在datanode的磁盘中找到该block的物理块,并确认该block对应的验证信息和meta信息正确,并返回一个BlockRecord对象,表示这个block正在被recovery。
(4)对每个BlockRecord,查看keepLength标志位是否为true,如果为true,则只recovery blocksize 跟 namenode中记录的blocksize一致的block,否则全部都算。并且block的size为BlockRecord最小的。
(5)对每个物理块,一旦真正开始recovery操作,则进行如下操作:在该datanode上找到该block,同时找到这个block对应的meta文件 (每一个block都对应一个meta文件,用来记录该block的验证码等原信息),更新该block的stamp号(表示该block已经被修改过一 次)
(6)如果需要recovery成的block的size 小于实际的block的size,则将实际的block截断成其需要的大小,并更新meta文件和验证信息。
第四步:datanode处理recovery结果
primary datanode调用各个BlockRecord对应的datanode进行Block同步,然后向namenode提交块同步信息。
for(BlockRecord r : syncList) { try { r.datanode.updateBlock(r.info.getBlock(), newblock, closeFile); successList.add(r.id); } catch (IOException e) { InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + newblock + ", datanode=" + r.id + ")", e); } } namenode.commitBlockSynchronization(block,newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,nlist);
(1)updateBlock 更新Block
updateBlock的最外层是一个死循环,循环的结束条件,是没有任何和这个数据块相关的写线程。每次循环,updateBlock都会去调用一个叫tryUpdateBlock的内部方法。tryUpdateBlock发现已经没有线程在写这个块,就会跟新和这个数据块相关的信息,包括元文件和内存中的映射表volumeMap。如果tryUpdateBlock发现还有活跃的线程和该块关联,那么,updateBlock会试图结束该线程,并等在join上等待。
public void updateBlock(Block oldblock, Block newblock) throws IOException { if (oldblock.getBlockId() != newblock.getBlockId()) { throw new IOException("Cannot update oldblock (=" + oldblock + ") to newblock (=" + newblock + ")."); } // Protect against a straggler updateblock call moving a block backwards // in time. boolean isValidUpdate = (newblock.getGenerationStamp() > oldblock.getGenerationStamp()) || (newblock.getGenerationStamp() == oldblock.getGenerationStamp() && newblock.getNumBytes() == oldblock.getNumBytes()); if (!isValidUpdate) { throw new IOException( "Cannot update oldblock=" + oldblock + " to newblock=" + newblock + " since generation stamps must " + "increase, or else length must not change."); } for(;;) { final List<Thread> threads = tryUpdateBlock(oldblock, newblock); if (threads == null) { return; } interruptAndJoinThreads(threads); } }
/** * Try to update an old block to a new block. * If there are ongoing create threads running for the old block, * the threads will be returned without updating the block. * 用于将旧块截断成新块(调用truncateBlock),并截断相应的元数据文件,以及更新ongoingCreates、volumeMap。 * @return ongoing create threads if there is any. Otherwise, return null. */ private synchronized List<Thread> tryUpdateBlock(Block oldblock, Block newblock) throws IOException { //check ongoing create threads 获取与此块相关的文件及访问这个块的线程 ArrayList<Thread> activeThreads = getActiveThreads(oldblock); if (activeThreads != null) { return activeThreads; //如果近期有对此块进行操作,返回存活的操作线程 } //近期无对此块操作的线程,就更新块 ////获得旧块的文件 //No ongoing create threads is alive. Update block. File blockFile = findBlockFile(oldblock.getBlockId()); if (blockFile == null) { throw new IOException("Block " + oldblock + " does not exist."); } File oldMetaFile = findMetaFile(blockFile); long oldgs = parseGenerationStamp(blockFile, oldMetaFile); // First validate the update //update generation stamp //旧块stamp比新块stamp大,不合法 if (oldgs > newblock.getGenerationStamp()) { throw new IOException("Cannot update block (id=" + newblock.getBlockId() + ") generation stamp from " + oldgs + " to " + newblock.getGenerationStamp()); } //update length //新块的大小大于旧块的大小 if (newblock.getNumBytes() > oldblock.getNumBytes()) { throw new IOException("Cannot update block file (=" + blockFile + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes()); } // Now perform the update //rename meta file to a tmp file //旧块元数据文件重命名 File tmpMetaFile = new File(oldMetaFile.getParent(),oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp()); if (!oldMetaFile.renameTo(tmpMetaFile)){ throw new IOException("Cannot rename block meta file to " + tmpMetaFile); } //新块的大小小于旧块的大小 截断旧块和旧块的元数据文件 if (newblock.getNumBytes() < oldblock.getNumBytes()) { truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes()); } //rename the tmp file to the new meta file (with new generation stamp) File newMetaFile = getMetaFile(blockFile, newblock); if (!tmpMetaFile.renameTo(newMetaFile)) { throw new IOException("Cannot rename tmp meta file to " + newMetaFile); } updateBlockMap(ongoingCreates, oldblock, newblock); updateBlockMap(volumeMap, oldblock, newblock); // paranoia! verify that the contents of the stored block // matches the block file on disk. validateBlockMetadata(newblock); return null; } //truncateBlock对旧块blockFile和对应的元数据文件metaFile进行截断,截断后旧块长度为newlen(newlen<oldlen)。 static void truncateBlock(File blockFile, File metaFile,long oldlen, long newlen) throws IOException { if (newlen == oldlen) { return; } if (newlen > oldlen) { throw new IOException("Cannot truncate block to from oldlen (=" + oldlen + ") to newlen (=" + newlen + ")"); } if (newlen == 0) { // Special case for truncating to 0 length, since there's no previous // chunk. RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); try { //truncate blockFile blockRAF.setLength(newlen); } finally { blockRAF.close(); } //update metaFile RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); try { metaRAF.setLength(BlockMetadataHeader.getHeaderSize()); } finally { metaRAF.close(); } return; } //由于只是对就块进行截断,所有新块的最后一个校验和字段可能在旧块中不一样, //所有setLength进行截断后,要读取最后一个校验和字段 DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); int checksumsize = dcs.getChecksumSize(); int bpc = dcs.getBytesPerChecksum(); long newChunkCount = (newlen - 1)/bpc + 1;//校验和的段数 long newmetalen = BlockMetadataHeader.getHeaderSize() + newChunkCount*checksumsize;//新的校验和文件的长度 long lastchunkoffset = (newChunkCount - 1)*bpc;//最后一个校验和字段的偏移位置 int lastchunksize = (int)(newlen - lastchunkoffset); //最后一个校验和的开始位置 byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");//对旧块进行读取 try { //truncate blockFile blockRAF.setLength(newlen); //read last chunk blockRAF.seek(lastchunkoffset); blockRAF.readFully(b, 0, lastchunksize); } finally { blockRAF.close(); } //compute checksum dcs.update(b, 0, lastchunksize); dcs.writeValue(b, 0, false); //update metaFile RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); try { metaRAF.setLength(newmetalen); metaRAF.seek(newmetalen - checksumsize); metaRAF.write(b, 0, checksumsize); } finally { metaRAF.close(); } }
(2) commitBlockSynchronization
参数分别是block,数据块;newgenerationstamp,新的时间戳;newlength,新长度;closeFile,是否关闭文件,deleteblock,是否删除文件;newtargets,新的目标列表。
处理流程:
参数检查,获取对应的文件,记为pendingFile;从BlocksMap中删除老的信息;如果deleteblock为true,从pendingFile删除Block记录;否则,更新Block的信息;如果不关闭文件,那么写日志保存更新,返回;最后如果关闭文件的话,调用finalizeINodeFileUnderConstruction。
相关推荐
赠送jar包:hadoop-hdfs-client-2.9.1.jar 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-client-2.9.1-javadoc-...
赠送jar包:hadoop-hdfs-client-2.9.1.jar; 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-client-2.9.1.pom;...
赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.9.1.jar 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-2.9.1-javadoc-API文档-中文(简体)版.zip 对应...
赠送jar包:hadoop-hdfs-2.9.1.jar; 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.9.1.pom; 包含翻译后的API文档:hadoop...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
在Hadoop这个分布式计算框架中,HDFS(Hadoop Distributed File System)和MapReduce是两个核心组件,它们共同构建了大数据处理的基础架构。HDFS提供了高容错性的分布式存储,而MapReduce则提供了大规模数据集的并行...
Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】--...
hadoop-hdfs-2.4.1.jar
hadoop-hdfs-2.7.3搭建flume1.7需要用到的包,还有几个包也有提供
flume 想要将数据输出到hdfs,必须要有hadoop相关jar包。本资源是hadoop 2.7.7版本
hadoop-hdfs-2.2.0.jar 点击下载资源即表示您确认该资源不违反资源分享的使用条款
hadoop-hdfs-test-0.21.0.jar