第七章:小朱笔记hadoop之源码分析-hdfs分析
第五节:Datanode 分析
5.4 DataBlockScanner 文件校验
由于每一个磁盘或者是网络上的I/O操作可能会对正在读写的数据处理不慎而出现错误,所以HDFS提供了下面两种数据检验方式,以此来保证数据的完整性,而且这两种检验方式在DataNode节点上是同时工作的:
(1)校验和
检测损坏数据的常用方法是在第一次进行系统时计算数据的校验和,在通道传输过程中,如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是被损坏的。
(2)数据块检测程序(DataBlockScanner)
在DataNode节点上开启一个后台线程,来定期验证存储在它上所有块,这个是防止物理介质出现损减情况而造成的数据损坏。
关于校验和,HDFS以透明的方式检验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。正对数据的每一个校验块,都会创建一个单独的校验 和,默认校验块大小是512字节,对应的校验和是4字节。DataNode节点负载在存储数据(当然包括数据的校验和)之前验证它们收到的数据, 如果此 DataNode节点检测到错误,客户端会收到一个CheckSumException。客户端读取DataNode节点上的数据时,会验证校验和,即将 其与DataNode上存储的校验和进行比较。每一个DataNode节点都会维护着一个连续的校验和和验证日志,里面有着每一个Block的最后验证时 间。客户端成功验证Block之后,便会告诉DataNode节点,Datanode节点随之更新日志。
DataBlockScanner是在一个单独的线程里面进行执行,作用是周期性的对block进行校验,当DFSClient读取时,也会通知DataBlockScanner校验结果。
DataBlockScanner最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量,最小扫描速度是1 MB/s,默认扫描周期是21天,扫描周期可通过dfs.datanode.scan.period.hours来设置。
该类中的属性TreeSet<BlockScanInfo> blockInfoSet用来保存block的扫描时间,离现在时间间隔最长的排在首位,扫描的过程如下:
检查blockInfoSet中的第一个block的最后扫描时间距离现在是否超过扫描周期,如果不超过,休眠一定时间然后开始下次检查,如果扫描周期, 那么对该block进行校验,校验使用BlockSender来读取,读取的数据输出到 NullOutputStream,我们知道BlockSender在读取数据时,可以检查checksum,以此来判断是否校验成功。如果校验失败,进 行第二次校验,如果两次都失败,说明该block有错误,通知namenode。
与扫描日志相关的参数有:
最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量 最小扫描速度是1 MB/s 默认扫描周期是3周,扫描周期可通过配置${dfs.datanode.scan.period.hours}来设置
共有两个日志:当前日志,文件后缀是.curr;前一个日志,文件后缀是.prev
minRollingPeriod:日志最小滚动周期是6小时
minWarnPeriod:日志最小警告周期是6小时,在一个警告周期内只有发出一个警告
minLineLimit:日志最小行数限制是1000
采用滚动日志方式,只有当前行数curNumLines超过最大行数maxNumLines,并且距离上次滚动日志的时间超过minRollingPeriod时,才将dncp_block_verification.log.curr重命名为dncp_block_verification.log.prev,将新的日志写到dncp_block_verification.log.curr中。
class DataBlockScanner implements Runnable { public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec static final long DEFAULT_SCAN_PERIOD_HOURS = 21 * 24L; // three weeks private static final long ONE_DAY = 24 * 3600 * 1000L; static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); static final String verificationLogFile = "dncp_block_verification.log"; static final int verficationLogLimit = 5; // * numBlocks. //一个扫描周期,可以由Datanode的配置文件来设置,配置项是:dfs.datanode.scan.period.hours,单位是小时,默认的值是21*24*60*60*1000 ms private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000; DataNode datanode; //数据块管理器 FSDataset dataset; // sorted set //数据块扫描信息集合,按照上一次扫描时间和数据块id升序排序,以便快速获取验证到期的数据块; TreeSet<BlockScanInfo> blockInfoSet; //数据块和数据块扫描信息的映射,以便能够根据数据块快速获取对应的扫描信息; HashMap<Block, BlockScanInfo> blockMap; long totalScans = 0; long totalVerifications = 0; // includes remote verification by clients. long totalScanErrors = 0; long totalTransientErrors = 0; long currentPeriodStart = System.currentTimeMillis(); //一个扫描周期中还剩下需要扫描的数据量; long bytesLeft = 0; // Bytes to scan in this period //一个扫描周期中需要扫描的总数据量; long totalBytesToScan = 0; //数据块的扫描验证日志记录器; private LogFileHandler verificationLog; Random random = new Random(); //扫描时I/O速度控制器,需要根据totalBytesToScan和bytesLeft信息来衡量; BlockTransferThrottler throttler = null; private static enum ScanType { REMOTE_READ, // Verified when a block read by a client etc VERIFICATION_SCAN, // scanned as part of periodic verfication NONE, } ...... }
DataBlockScanner被DataNode节点用来检测它所管理的所有Block数据块的一致性,因此,对已DataNode节点上的每一个 Block,它都会每隔scanPeriod ms利用Block对应的校验和文件来检测该Block一次,看看这个Block的数据是否已经损坏。由于scanPeriod 的值一般比较大,因为对DataNode节点上的每一个Block扫描一遍要消耗不少系统资源,这就可能带来另外一个问题就是在一个扫描周 期内可能会出现DataNode节点重启的情况,所以为了提供系统性能,避免DataNode节点在启动之后对还没有过期的Block又扫描一 遍,DataBlockScanner在其内部使用了日志记录器来持久化保存每一个Block上一次扫描的时间,这样的话, DataNode节点在启动之后通过日志文件来恢复之前所有Block的有效时间。另外,DataNode为了节约系统资源,它对Block的验证不仅仅 只依赖于DataBlockScanner后台线程(VERIFICATION_SCAN方式),还会在向某一个客户端传送Block的时候来更行该 Block的扫描时间(REMOTE_READ方式),这是因为DataNode向客户端传送一个Block的时候要必须校验该数据块。那么这个时候日志 记录器并不会马上把该数据块的扫描信息写到日志,毕竟频繁的磁盘I/O会导致性能下降,至于何时对该Block的最新扫描时间写日志有一个判断条件: 1.如果是VERIFICATION_SCAN方式的Block验证,必须记日志; 2.如果是REMOTE_READ方式,那么该Block上一次的记录日志到现在的时间间隔超过24小时或者超过scanPeriod/3 ms 的话,记日志。 下面来结合源码详细讨论这个过程:
public void run() { try { //初始化 //1.为每个Block创建BlockScanInfo //2.创建 日志扫描器 LogFileHandler //3.创建 扫描速度控制器 BlockTransferThrottler init(); // Read last verification times //为每一个Block分配上一次验证的时间 if (!assignInitialVerificationTimes()) { return; } //调整扫描速度 adjustThrottler(); while (datanode.shouldRun && !Thread.interrupted()) { long now = System.currentTimeMillis(); synchronized (this) { if (now >= (currentPeriodStart + scanPeriod)) { //新周期 startNewPeriod(); } } if ((now - getEarliestScanTime()) >= scanPeriod) { //校验 verifyFirstBlock(); } else { try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } } } catch (RuntimeException e) { LOG.warn("RuntimeException during DataBlockScanner.run() : " + StringUtils.stringifyException(e)); throw e; } finally { shutdown(); LOG.info("Exiting DataBlockScanner thread."); } }
(1)初始化init
初始化两个关键数据结构
blockInfoSet = new TreeSet<BlockScanInfo>(); blockMap = new HashMap<Block, BlockScanInfo>(); 为每个Block创建BlockScanInfo,创建日志扫描器 LogFileHandler ,创建扫描速度控制器 BlockTransferThrottler
(2)为每一个Block分配上一次验证的时间
BlockScanInfo info; while ((info = blockInfoSet.first()).lastScanTime < 0) { delBlockInfo(info); info.lastScanTime = lastScanTime; lastScanTime += verifyInterval; addBlockInfo(info); }
(3)调整扫描速度
在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的 磁盘I/O,为了不影响DataNode节点上其它线程的工作资源,同时也为了自身工作的有效性,所以DataBlockScanner采用了扫描验证速 度控制器,根据当前的工作量来控制当前数据块的验证速度。
//本次扫描验证还剩余的时间 long timeLeft = currentPeriodStart + scanPeriod - System.currentTimeMillis(); //根据本次验证扫描剩余的工作量和时间来计算速度 long bw = Math.max(bytesLeft * 1000 / timeLeft, MIN_SCAN_RATE); throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
(4)DataNode节点在向客户端或者其它DataNode节点传输数据时,客户端 或者其它DataNode节点会根据接收的数据校验和来验证接收到的数据,当验证出错时,它们会通知传送节点。DataBlockScanner通过自己 扮演传输者又扮演接受者来实现数据块的验证的;同时为了防止本地磁盘的I/O的错误,DataBlockScanner采用了两次传输-接收来确保验证的 Block的数据是出错了(损坏了)。当发现有出错的Block是,就需要向NameNode节点报告,由NameNode来决定如何处理这个数据块,而 不是由DataNode节点擅自作主清除该Block数据信息。
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode); DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream()); blockSender.sendBlock(out, null, throttler);
(5)错误处理
DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) }; LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; //向NameNode节点发送出错的Block datanode.namenode.reportBadBlocks(blocks);
相关推荐
赠送jar包:hadoop-hdfs-client-2.9.1.jar 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-client-2.9.1-javadoc-...
赠送jar包:hadoop-hdfs-client-2.9.1.jar; 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-client-2.9.1.pom;...
赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.9.1.jar; 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.9.1.pom; 包含翻译后的API文档:hadoop...
赠送jar包:hadoop-hdfs-2.9.1.jar 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-2.9.1-javadoc-API文档-中文(简体)版.zip 对应...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】--...
在Hadoop这个分布式计算框架中,HDFS(Hadoop Distributed File System)和MapReduce是两个核心组件,它们共同构建了大数据处理的基础架构。HDFS提供了高容错性的分布式存储,而MapReduce则提供了大规模数据集的并行...
hadoop-hdfs-2.4.1.jar
hadoop-hdfs-2.7.3搭建flume1.7需要用到的包,还有几个包也有提供
flume 想要将数据输出到hdfs,必须要有hadoop相关jar包。本资源是hadoop 2.7.7版本
hadoop-hdfs-2.2.0.jar 点击下载资源即表示您确认该资源不违反资源分享的使用条款
hadoop-hdfs-test-0.21.0.jar