`

第七章:小朱笔记hadoop之源码分析-hdfs分析 DataBlockScanner 文件校验

 
阅读更多

第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

5.4  DataBlockScanner 文件校验

由于每一个磁盘或者是网络上的I/O操作可能会对正在读写的数据处理不慎而出现错误,所以HDFS提供了下面两种数据检验方式,以此来保证数据的完整性,而且这两种检验方式在DataNode节点上是同时工作的:

 

(1)校验和

 

检测损坏数据的常用方法是在第一次进行系统时计算数据的校验和,在通道传输过程中,如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是被损坏的。

 

(2)数据块检测程序(DataBlockScanner)

 

在DataNode节点上开启一个后台线程,来定期验证存储在它上所有块,这个是防止物理介质出现损减情况而造成的数据损坏。

 

 

 

       关于校验和,HDFS以透明的方式检验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。正对数据的每一个校验块,都会创建一个单独的校验 和,默认校验块大小是512字节,对应的校验和是4字节。DataNode节点负载在存储数据(当然包括数据的校验和)之前验证它们收到的数据, 如果此 DataNode节点检测到错误,客户端会收到一个CheckSumException。客户端读取DataNode节点上的数据时,会验证校验和,即将 其与DataNode上存储的校验和进行比较。每一个DataNode节点都会维护着一个连续的校验和和验证日志,里面有着每一个Block的最后验证时 间。客户端成功验证Block之后,便会告诉DataNode节点,Datanode节点随之更新日志。

 

        DataBlockScanner是在一个单独的线程里面进行执行,作用是周期性的对block进行校验,当DFSClient读取时,也会通知DataBlockScanner校验结果。

 

        DataBlockScanner最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量,最小扫描速度是1 MB/s,默认扫描周期是21天,扫描周期可通过dfs.datanode.scan.period.hours来设置。

 

        该类中的属性TreeSet<BlockScanInfo> blockInfoSet用来保存block的扫描时间,离现在时间间隔最长的排在首位,扫描的过程如下:

 

        检查blockInfoSet中的第一个block的最后扫描时间距离现在是否超过扫描周期,如果不超过,休眠一定时间然后开始下次检查,如果扫描周期, 那么对该block进行校验,校验使用BlockSender来读取,读取的数据输出到 NullOutputStream,我们知道BlockSender在读取数据时,可以检查checksum,以此来判断是否校验成功。如果校验失败,进 行第二次校验,如果两次都失败,说明该block有错误,通知namenode。

 

 

 

      与扫描日志相关的参数有:

    最大扫描速度是8 MB/s,通过BlockTransferThrottler来限制流量  
    最小扫描速度是1 MB/s  
    默认扫描周期是3周,扫描周期可通过配置${dfs.datanode.scan.period.hours}来设置  

 

写道
日志文件名前缀是dncp_block_verification.log
共有两个日志:当前日志,文件后缀是.curr;前一个日志,文件后缀是.prev
minRollingPeriod:日志最小滚动周期是6小时
minWarnPeriod:日志最小警告周期是6小时,在一个警告周期内只有发出一个警告
minLineLimit:日志最小行数限制是1000

采用滚动日志方式,只有当前行数curNumLines超过最大行数maxNumLines,并且距离上次滚动日志的时间超过minRollingPeriod时,才将dncp_block_verification.log.curr重命名为dncp_block_verification.log.prev,将新的日志写到dncp_block_verification.log.curr中。

 

    class DataBlockScanner implements Runnable {  
      
      public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);  
      
      private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec  
      private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec  
      
      static final long DEFAULT_SCAN_PERIOD_HOURS = 21 * 24L; // three weeks  
      private static final long ONE_DAY = 24 * 3600 * 1000L;  
      
      static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");  
      
      static final String verificationLogFile = "dncp_block_verification.log";  
      static final int verficationLogLimit = 5; // * numBlocks.  
      
      //一个扫描周期,可以由Datanode的配置文件来设置,配置项是:dfs.datanode.scan.period.hours,单位是小时,默认的值是21*24*60*60*1000 ms  
        
      private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;  
      DataNode datanode;  
        
      //数据块管理器  
      FSDataset dataset;  
      
      // sorted set  
      //数据块扫描信息集合,按照上一次扫描时间和数据块id升序排序,以便快速获取验证到期的数据块;  
      TreeSet<BlockScanInfo> blockInfoSet;  
        
      //数据块和数据块扫描信息的映射,以便能够根据数据块快速获取对应的扫描信息;  
      HashMap<Block, BlockScanInfo> blockMap;  
      
      long totalScans = 0;  
      long totalVerifications = 0; // includes remote verification by clients.  
      long totalScanErrors = 0;  
      long totalTransientErrors = 0;  
      
      long currentPeriodStart = System.currentTimeMillis();  
        
      //一个扫描周期中还剩下需要扫描的数据量;  
      long bytesLeft = 0; // Bytes to scan in this period  
        
      //一个扫描周期中需要扫描的总数据量;  
      long totalBytesToScan = 0;  
      
      //数据块的扫描验证日志记录器;  
      private LogFileHandler verificationLog;  
      
      Random random = new Random();  
      
      //扫描时I/O速度控制器,需要根据totalBytesToScan和bytesLeft信息来衡量;  
      BlockTransferThrottler throttler = null;  
      
      private static enum ScanType {  
        REMOTE_READ, // Verified when a block read by a client etc  
        VERIFICATION_SCAN, // scanned as part of periodic verfication  
        NONE,  
      }  
      
      
    ......  
      
    }  

 

 

 

     

 

 

         DataBlockScanner被DataNode节点用来检测它所管理的所有Block数据块的一致性,因此,对已DataNode节点上的每一个 Block,它都会每隔scanPeriod ms利用Block对应的校验和文件来检测该Block一次,看看这个Block的数据是否已经损坏。由于scanPeriod 的值一般比较大,因为对DataNode节点上的每一个Block扫描一遍要消耗不少系统资源,这就可能带来另外一个问题就是在一个扫描周 期内可能会出现DataNode节点重启的情况,所以为了提供系统性能,避免DataNode节点在启动之后对还没有过期的Block又扫描一 遍,DataBlockScanner在其内部使用了日志记录器来持久化保存每一个Block上一次扫描的时间,这样的话, DataNode节点在启动之后通过日志文件来恢复之前所有Block的有效时间。另外,DataNode为了节约系统资源,它对Block的验证不仅仅 只依赖于DataBlockScanner后台线程(VERIFICATION_SCAN方式),还会在向某一个客户端传送Block的时候来更行该 Block的扫描时间(REMOTE_READ方式),这是因为DataNode向客户端传送一个Block的时候要必须校验该数据块。那么这个时候日志 记录器并不会马上把该数据块的扫描信息写到日志,毕竟频繁的磁盘I/O会导致性能下降,至于何时对该Block的最新扫描时间写日志有一个判断条件: 1.如果是VERIFICATION_SCAN方式的Block验证,必须记日志; 2.如果是REMOTE_READ方式,那么该Block上一次的记录日志到现在的时间间隔超过24小时或者超过scanPeriod/3 ms 的话,记日志。 下面来结合源码详细讨论这个过程:

 

 

    public void run() {  
       try {  
         //初始化   
         //1.为每个Block创建BlockScanInfo  
         //2.创建 日志扫描器 LogFileHandler   
         //3.创建 扫描速度控制器 BlockTransferThrottler  
         init();  
      
         // Read last verification times  
         //为每一个Block分配上一次验证的时间    
         if (!assignInitialVerificationTimes()) {  
           return;  
         }  
         //调整扫描速度  
         adjustThrottler();  
      
         while (datanode.shouldRun && !Thread.interrupted()) {  
           long now = System.currentTimeMillis();  
           synchronized (this) {  
             if (now >= (currentPeriodStart + scanPeriod)) {  
               //新周期  
               startNewPeriod();  
             }  
           }  
           if ((now - getEarliestScanTime()) >= scanPeriod) {  
             //校验  
             verifyFirstBlock();  
           } else {  
             try {  
               Thread.sleep(1000);  
             } catch (InterruptedException ignored) {  
             }  
           }  
         }  
       } catch (RuntimeException e) {  
         LOG.warn("RuntimeException during DataBlockScanner.run() : " + StringUtils.stringifyException(e));  
         throw e;  
       } finally {  
         shutdown();  
         LOG.info("Exiting DataBlockScanner thread.");  
       }  
     }  

 

 

(1)初始化init

 

 初始化两个关键数据结构

 

 blockInfoSet = new TreeSet<BlockScanInfo>();  
blockMap = new HashMap<Block, BlockScanInfo>();  
为每个Block创建BlockScanInfo,创建日志扫描器 LogFileHandler ,创建扫描速度控制器 BlockTransferThrottler  

 

 

(2)为每一个Block分配上一次验证的时间

 

    BlockScanInfo info;  
    while ((info = blockInfoSet.first()).lastScanTime < 0) {  
        delBlockInfo(info);  
        info.lastScanTime = lastScanTime;  
        lastScanTime += verifyInterval;  
        addBlockInfo(info);  
      }   

 

 

(3)调整扫描速度

 

 在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的 磁盘I/O,为了不影响DataNode节点上其它线程的工作资源,同时也为了自身工作的有效性,所以DataBlockScanner采用了扫描验证速 度控制器,根据当前的工作量来控制当前数据块的验证速度。

 

    //本次扫描验证还剩余的时间    
    long timeLeft = currentPeriodStart + scanPeriod - System.currentTimeMillis();  
    //根据本次验证扫描剩余的工作量和时间来计算速度    
    long bw = Math.max(bytesLeft * 1000 / timeLeft, MIN_SCAN_RATE);  
    throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));  

 

(4)DataNode节点在向客户端或者其它DataNode节点传输数据时,客户端 或者其它DataNode节点会根据接收的数据校验和来验证接收到的数据,当验证出错时,它们会通知传送节点。DataBlockScanner通过自己 扮演传输者又扮演接受者来实现数据块的验证的;同时为了防止本地磁盘的I/O的错误,DataBlockScanner采用了两次传输-接收来确保验证的 Block的数据是出错了(损坏了)。当发现有出错的Block是,就需要向NameNode节点报告,由NameNode来决定如何处理这个数据块,而 不是由DataNode节点擅自作主清除该Block数据信息。

 

    blockSender = new BlockSender(block, 0, -1, false, false, true, datanode);  
    DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream());  
    blockSender.sendBlock(out, null, throttler);  

 

(5)错误处理

    DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };  
    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };  
    //向NameNode节点发送出错的Block   
    datanode.namenode.reportBadBlocks(blocks);  

 

 

 

分享到:
评论

相关推荐

    hadoop-hdfs-client-2.9.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-client-2.9.1.jar 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-client-2.9.1-javadoc-...

    hadoop-hdfs-client-2.9.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-client-2.9.1.jar; 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-client-2.9.1.pom;...

    hadoop-hdfs-2.7.3-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.5.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.7.3-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.5.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.9.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.9.1.jar; 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.9.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.9.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.9.1.jar 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-2.9.1-javadoc-API文档-中文(简体)版.zip 对应...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码

    Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】--...

    hadoop源码分析-HDFS&MapReduce

    在Hadoop这个分布式计算框架中,HDFS(Hadoop Distributed File System)和MapReduce是两个核心组件,它们共同构建了大数据处理的基础架构。HDFS提供了高容错性的分布式存储,而MapReduce则提供了大规模数据集的并行...

    hadoop-hdfs-2.4.1.jar

    hadoop-hdfs-2.4.1.jar

    hadoop-hdfs-2.7.3

    hadoop-hdfs-2.7.3搭建flume1.7需要用到的包,还有几个包也有提供

    hadoop-hdfs-2.7.7.jar

    flume 想要将数据输出到hdfs,必须要有hadoop相关jar包。本资源是hadoop 2.7.7版本

    hadoop-hdfs-2.2.0.jar

    hadoop-hdfs-2.2.0.jar 点击下载资源即表示您确认该资源不违反资源分享的使用条款

    hadoop-hdfs-test-0.21.0.jar

    hadoop-hdfs-test-0.21.0.jar

Global site tag (gtag.js) - Google Analytics