第四节:namenode分析
4.3 namenode 副本监控分析ReplicationMonitor
ReplicationMonitor主要有两个作用:
(1)负责为副本不足的数据块选择source 数据节点,选择冗余的target节点,等待DN节点下次心跳将这些工作带回给相应的DN执行块冗余操作。
(2)将各个数据节点上无效的数据块副本加入无效集合,等待下次心跳将这些工作带回给相应的DataNode执行删除无效块操作。
默认每3s执行一次,可以通过修改dfs.replication.interval来调整执行间隔
(1)computeDatanodeWork
计算datanode需要处理的replication数量,主要包括当前超时挂起的replication,需要进行复制的replication,计 划处理的replication,损坏的replication。将这些数据记录下来,在下次heartbeat时候通知给datanode处理。
(2)processPendingReplications
处理超时挂起的replication,将超时的replication加入到需要进行replication操作的队列中
重要的控制参数:
//限制每一次处理无效Blocks的数据节点数量 static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32; //限制每一次处理冗余Blocks的数据节点数量 static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
(1)UnderReplicatedBlocks neededReplications
需要进行复制的数据块。UnderReplicatedBlocks是一个数组,数组的下标是优先级(0的优先级最高,如果数据块只有一个副本,它的优先 级是0), 数组的内容是一个Block集合。UnderReplicatedBlocks提供一些方法,对Block进行增加,修改,查找和删除。类 UnderReplicatedBlocks的一个实例,负责存储需要冗余副本的块集合, 副本可能被冗余1份或多份;在数据块报告的时候,检查到块副本数量不够,就会将数据块加入该实例存储;在ReplicationMonitor中,将 block及其target加入replicateBlocks队列后,将该块从neededReplications中删除;
(2)PendingReplicationBlocks pendingReplications
描述当前尚未完成块副本复制的块的列表。因为HDFS支持块的流水线复制,当文件系统客户端开始向第一个Datanode复制数据块的时候,会在第一个 Datanode上启动复制进程,将该结点上接收到的(部分),数据块开始向第二个Datanode上传输复制,而第二个Datanode结点又向第三个 Datanode结点进行流水线复制,……,直到满足副本因子要求。所以,在执行流水线复制的过程中,因为这种可并行的复制方式使得某些块副本的复制完成 时间呈现阶梯状,也就是使用一个数据结构来保存这些尚未复制完成的块副本pendingReplications保存了所有正在进行复制的数据块,使用 Map是需要一些附加的信息PendingBlockInfo。这些信息包括时间戳, 用于检测是否已经超时,和现在进行复制的数目numReplicasInProgress。timedOutItems是超时的复制项,超时的复制项在 FSNamesystem的processPendingReplications方法中被删除,并从新复制。timerThread是用于检测复制超时 的线程的句柄,对应的线程是PendingReplicationMonitor的一个实例,它的run方法每隔一段会检查是否有超时的复制项,如果有, 将该数据块加到timedOutItems中。Timeout是run方法的检查间隔,defaultRecheckInterval是缺省值。在 ReplicationMonitor中,又会将timedOutItems中的数据块重新加入neededReplications;
我们分两个方面一一分析:
(1)块冗余处理机制
块冗余处理机制分为四个步骤:选择需要冗余的数据块、为数据块选择源节点、为数据块选择目标节点、将块和目标节点加入待冗余的队列。下面一一分析:
第一步:获取一定量需要冗余的数据块
当DN节点报告块的时候,NN会判断块的冗余是否足够,如果不足可能有几种情况:副本可能1份或者2份,如果副本仅一份,则需要冗余的优先级就相对更高, 则会将该块放入优先级更高的队列,副本已经有2份的块则会放入优先级更低点的队列,编号越低,一共分为3个优先级。
UnderReplicatedBlocks 主要的成员是一个优先队列,List<TreeSet<Block>> priorityQueues,保存副本数没有达到期望值的block。根据当前副本数低于期望值的程度决定优先级,差的越远,优先级越大(0~2,0最 大)。每个优先级别对应一个TreeSet,getPriority获得优先级后决定放入哪个treeset中。
对于冗余工作,根据DN节点数量不同,每次处理的数据块也会不同,比如当前集群有10个DN 节点,则默认情况下,ReplicationMonitor一个周期只会从neededReplications中取出20个数据块进行处理,之所以 HDFS要这样设计,主要是考虑到如果一次冗余的数据块太多,势必会对DN节点的网络造成比较大的影响,因此根据不同的DN集群规模决定一次冗余的数据块 数量。在具体冗余的过程中,首先按照优先级从优先级队列中拿一部分数据(如前面提及的20)块到replicateBlocks中,并且每次记录下读取的 优先级队列的偏移量,下次从该偏移指针处真正读取块信息。
以上工作均在 chooseUnderReplicatedBlocks中分四步完成:
synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) { // initialize data structure for the return value //1 初始化三级数组 List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL); for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) { blocksToReplicate.add(new ArrayList<Block>()); } synchronized(neededReplications) { if (neededReplications.size() == 0) { missingBlocksInCurIter = 0; missingBlocksInPrevIter = 0; return blocksToReplicate; } //2.跳过已经处理的数据 replIndex为游标 // Go through all blocks that need replications. BlockIterator neededReplicationsIterator = neededReplications.iterator(); // skip to the first unprocessed block, which is at replIndex for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) { neededReplicationsIterator.next(); } // # of blocks to process equals either twice the number of live // data-nodes or the number of under-replicated blocks whichever is less //要取的block 和 neededReplications 数量 取较小值 blocksToProcess = Math.min(blocksToProcess, neededReplications.size()); //3.迭代获取需要冗余的Block,根据优先级放入不同的数据 for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) { if( ! neededReplicationsIterator.hasNext()) { // start from the beginning replIndex = 0; missingBlocksInPrevIter = missingBlocksInCurIter; missingBlocksInCurIter = 0; blocksToProcess = Math.min(blocksToProcess, neededReplications.size()); if(blkCnt >= blocksToProcess) break; neededReplicationsIterator = neededReplications.iterator(); assert neededReplicationsIterator.hasNext() : "neededReplications should not be empty."; } Block block = neededReplicationsIterator.next(); int priority = neededReplicationsIterator.getPriority(); if (priority < 0 || priority >= blocksToReplicate.size()) { LOG.warn("Unexpected replication priority: " + priority + " " + block); } else { blocksToReplicate.get(priority).add(block); } } // end for } // end synchronized return blocksToReplicate; }
第二步:选择源节点
private DatanodeDescriptor chooseSourceDatanode(Block block,List<DatanodeDescriptor> containingNodes,NumberReplicas numReplicas); 依赖的数据结构 (a)块(Block)到其元数据的映射表,元数据信息包括块所属的inode、存储块的Datanode BlocksMap blocksMap (b)保存损坏(如:校验没通过)的数据块到对应DataNode的关系 CorruptReplicasMap corruptReplicas (c)保存Datanode上有效但需要删除的数据块(StorageID -> TreeSet<Block>) Map<String, Collection<Block>> excessReplicateMap
主要逻辑:
(a)判断在某DataNode上副本是否损坏,如果损坏,遍历下个DataNode节点;
(b)判断在某DataNode对于的replicateBlocks中,是否已经有大于等于2个数据块了,如果是这样,遍历下一个DataNode节点,之所以这样实现,防止单DataNode流量过高;
(c)如果某DataNode节点已经退役,遍历下一个DataNode节点;
(d)如果某DataNode节点正在退役,将该DataNode作为source,而这也是HDFS设计的目的,这样的节点不会让出现因为读写数据块而 网络拥 塞,因此也不会让网络那么忙,当然即使是正在退役的节点,replicateBlocks中的块数量也不能超过2个(防止网络拥塞);如果有多个节点正在 退役,会选择最后一个退役节点作为该块的source节点;
(e)如果没有正在退役的节点,随意选择一个DN节点作为source节点;
第三步:选择目标节点 目前存储了该块副本的DN节点不会再作为target节点,至于选择target节点的数目,根据需要再冗余的块副本数量而定,比如还需要冗余2个副 本,则会选择2个target节点,选择的方式,依然采用机架感知相关算法。该算法我会专门用一节来详细讲述。
第四步:将块和目标节点加入待冗余的队列
srcNode.addBlockToBeReplicated(block, targets);
注意:DatanodeDescriptor中的BlockQueue replicateBlocks = new BlockQueue();
(2)块无效处理机制
这种类型数据块对于集群没有作用,如果不删除,会占用磁盘空间。在几种情况下会产生无效块:
(a)执行删除文件的时候,会首先快速地删除元数据,文件所对应的数据块就变得无效;
(b)Client上传数据块到DataNode的时候,可能由于网络原因等,数据块上传一部分后失败,这些块成了无效块;
(c) 在HDFS做rebalance操作的时候,会将负载较重的DataNode上部分块文件复制到其他负载较轻的DataNode上,数据块复制成功后,负 载较轻的DataNode上的该数据块就成了无效块;处理的无效块数量也是有限制的:数量为活着的dn的32%, 即 nodesToProcess = (int)Math.ceil((double)heartbeats.size() * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100); 在该流程中,也并不是无效块集合中有多少块,NameNode就一次性让DataNode把相应的无效块删除完,而是每次均删除一部分,这样的话不会造成 DataNode因为删除块而带宽,CPU资源等占用过大。
依赖数据结构: Map<String, Collection<Block>> recentInvalidateSets ,保存了每个DataNode上有效,但需要删除的数据块(StorageID >> TreeSet<Block>)。
流程分析:
(a)周期性调用FSNameSystem的computeDatanodeWork方法
(b)computeDatanodeWork调用computeInvalidateWork方法
(c)循环调用invalidateWorkForOneNode方法,直到达到预设值或全部处理完
(d)invalidateWorkForOneNode方法中,从recentInvalidateSets结构中获得第一个datanode节点,找到 对应的DatanodeDescriptro,同时将其所有对应invalidate block遍历出
(e)将遍历出的invalidate block封装调用addBlocksToBeInvalidated方法,在下一次心跳汇报时返回给datanode进行相应处理
相关推荐
赠送jar包:hadoop-hdfs-client-2.9.1.jar 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-client-2.9.1-javadoc-...
赠送jar包:hadoop-hdfs-client-2.9.1.jar; 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-client-2.9.1.pom;...
赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.9.1.jar; 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.9.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.9.1.jar 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-2.9.1-javadoc-API文档-中文(简体)版.zip 对应...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
1. **NameNode与DataNode**:HDFS的核心组件包括NameNode和DataNode。NameNode作为元数据管理节点,存储文件系统的命名空间信息和文件的块映射信息。DataNode则是数据存储节点,负责存储实际的数据块。源码中,...
Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】--...
hadoop-hdfs-2.4.1.jar
hadoop-hdfs-2.7.3搭建flume1.7需要用到的包,还有几个包也有提供
flume 想要将数据输出到hdfs,必须要有hadoop相关jar包。本资源是hadoop 2.7.7版本
hadoop-hdfs-2.2.0.jar 点击下载资源即表示您确认该资源不违反资源分享的使用条款
hadoop-hdfs-test-0.21.0.jar