`
liaobinxu
  • 浏览: 43288 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

学习 Hadoop 源代码分析(一零)

阅读更多



学习 Hadoop 源代码分析(一零)


在继续分析DataNode之前,我们有必要看一下系统的工作状态。启动HDFS的

时候,我们可以选择以下启动参数:

启动选项枚举 StartupOption

   FORMAT  ("-format"):格式化系统

   REGULAR ("-regular"):正常启动

   BACKUP  ("-backup"):备份

   CHECKPOINT("-checkpoint"):检查点

   UPGRADE ("-upgrade"):升级

  ROLLBACK("-rollback"):回滚

   FINALIZE("-finalize"):提交

   IMPORT  ("-importCheckpoint"):从Checkpoint恢复




启动参数相关的命名节点NamenodeRole命名节点角色

ACTIVE: NameNode  命名节点(正常启动)

BACKUP: Backup Node 备份节点

CHECKPOINT: Checkpoint Node 检查点 节点

STANDBY: Standby Node: 备份节点



作为一个大型的分布式系统,Hadoop内部实现了一套升级机制(http://wiki.apache.org/hadoop/Hadoop_Upgrade)。upgrade参数就是为了这

个目的而存在的,当然,升级可能成功,也可能失败。如果失败了,那就用rollback进行回滚;如果过了一段时间,系统运行正常,那就可以通过

finalize,正式提交这次升级(跟数据库有点像啊)。importCheckpoint选项用于NameNode发生故障后,从某个检查点恢复。有了上面的描述,我们得到下面左边的状态图:




大家应该注意到,上面的升级/回滚/提交都不可能一下就搞定,就是说,系统故障时,它可能处于上面右边状态中的某一个。特别是分布式的各个节点上,甚至

可能出现某些节点已经升级成功,但有些节点可能处于中间状态的情况,所以Hadoop采用类似于数据库事务的升级机制也就不是很奇怪。大家先理解一下上面的状态图,它是下面我们要介绍DataNode存储的基础。


下图是升级时候 cluster / namenode , datanode 的时序图



DataNode收到以后,会将当前的数据块文件目录改名,从current改名为previous.tmp,建立一个snapshot,然后重建current目录。重建包括重建VERSION文件,重建对应的子目录,然后建立数据块文件和数据块元数据文件到revious.tmp的硬连接。建立硬连接意味着在系统中只保留一份数据块文件和数据块元数据文件,current和previous.tmp中的相应文件,在存储中,只保留一份。当所有的这些工作完成以后,会在current里写入新的VERSION文件,并将previous.tmp目录改名为previous,完成升级。




1) NORMAL - > UPGRADE

 

void org.apache.hadoop.hdfs.server.datanode.DataStorage.doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException
 
 


具体实现

 

 void doUpgrade(StorageDirectory sd,
                 NamespaceInfo nsInfo
                 ) throws IOException {
    LOG.info("Upgrading storage directory " + sd.getRoot()
             + ".\n   old LV = " + this.getLayoutVersion()
             + "; old CTime = " + this.getCTime()
             + ".\n   new LV = " + nsInfo.getLayoutVersion()
             + "; new CTime = " + nsInfo.getCTime());
    File curDir = sd.getCurrentDir();
    File prevDir = sd.getPreviousDir();
    assert curDir.exists() : "Current directory must exist.";
    // Cleanup directory "detach"
    cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
    // delete previous dir before upgrading
    if (prevDir.exists())
      deleteDir(prevDir);
    File tmpDir = sd.getPreviousTmp();
    assert !tmpDir.exists() : "previous.tmp directory must not exist.";
    // rename current to tmp
    rename(curDir, tmpDir);
    // hard link finalized & rbw blocks
    linkAllBlocks(tmpDir, curDir);
    // create current directory if not exists
    if (!curDir.exists() && !curDir.mkdirs())
      throw new IOException("Cannot create directory " + curDir);
    // write version file
    this.layoutVersion = FSConstants.LAYOUT_VERSION;
    assert this.namespaceID == nsInfo.getNamespaceID() :
      "Data-node and name-node layout versions must be the same.";
    this.cTime = nsInfo.getCTime();
    sd.write();
    // rename tmp to previous
    rename(tmpDir, prevDir);
    LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
  }
 

doUpgrade步骤

1.  清除detach文件夹(备份)

2.  清除previous 

3. current 文件夹改名为tmp(previous.tmp)

4. 重建current(重建的过程就是,把tmp文件的 all finalized and RBW blocks内容复制到current)

5. 保存从namenode获取的namespaceid和创建时间

6. 修改tmp(previous.tmp文件为previous



2). UPGRADE --(ROLLBACK) -- > NORMAL


void org.apache.hadoop.hdfs.server.datanode.DataStorage.doRollback(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException
 

 

 具体

 

void doRollback( StorageDirectory sd,
                   NamespaceInfo nsInfo
                   ) throws IOException {
    File prevDir = sd.getPreviousDir();
    // regular startup if previous dir does not exist
    if (!prevDir.exists())
      return;
    DataStorage prevInfo = new DataStorage();
    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
    prevSD.read(prevSD.getPreviousVersionFile());

    // We allow rollback to a state, which is either consistent with
    // the namespace state or can be further upgraded to it.
    if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
          && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback(previous.tmp
      throw new InconsistentFSStateException(prevSD.getRoot(),
                                             "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
                                             + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
                                             + " is newer than the namespace state: LV = "
                                             + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
    LOG.info("Rolling back storage directory " + sd.getRoot()
             + ".\n   target LV = " + nsInfo.getLayoutVersion()
             + "; target CTime = " + nsInfo.getCTime());
    File tmpDir = sd.getRemovedTmp();
    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
    // rename current to tmp
    File curDir = sd.getCurrentDir();
    assert curDir.exists() : "Current directory must exist.";
    rename(curDir, tmpDir);
    // rename previous to current
    rename(prevDir, curDir);
    // delete tmp dir
    deleteDir(tmpDir);
    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
  }
 



1. 获取previous version版本

2. 删除tmp目录

3. 把current目录改为tmp目录(remove.tmp)

4. 把previous 目录该为current

5. 删除tmp目录(remove.tmp)



3). UPGRADE --(FINALIZE)-- NORMAL

 

void org.apache.hadoop.hdfs.server.datanode.DataStorage.doFinalize(StorageDirectory sd) throws IOException
 

 此操作是管理员手动的操作(数据节点 通过heartbeat 获得命令)(不用停掉这个集群, 管理员命令)

 

 void doFinalize(StorageDirectory sd) throws IOException {
    File prevDir = sd.getPreviousDir();
    if (!prevDir.exists())
      return; // already discarded
    final String dataDirPath = sd.getRoot().getCanonicalPath();
    LOG.info("Finalizing upgrade for storage directory " 
             + dataDirPath 
             + ".\n   cur LV = " + this.getLayoutVersion()
             + "; cur CTime = " + this.getCTime());
    assert sd.getCurrentDir().exists() : "Current directory must exist.";
    final File tmpDir = sd.getFinalizedTmp();
    // rename previous to tmp
    rename(prevDir, tmpDir);

    // delete tmp dir in a separate thread
    new Daemon(new Runnable() {
        public void run() {
          try {
            deleteDir(tmpDir);
          } catch(IOException ex) {
            LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
          }
          LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
        }
        public String toString() { return "Finalize " + dataDirPath; }
      }).start();
  }
 


1. 把previous 目录改名为 tmp目录(finalize.tmp)

2. 单独启动一个线程用于删除tmp目录(finalize.tmp),原因是删除可能非常耗时,doFinalize是在心跳中完成的,重启一个线程保证心跳的高效。

 


 




 

StorageInfo包含了3个字段,分别是layoutVersion:版本号,如果Hadoop调整文件结构布局,版本号就会修改,这样可以保证文件结构和应用一致。namespaceID是Storage的ID,cTime,creation time。

和StorageInfo相比,Storage就是个大家伙了。

Storage可以包含多个根(参考配置项dfs.data.dir的说明),这些根通过Storage的内部类StorageDirectory来表示。

StorageDirectory中最重要的方法是analyzeStorage,它将根据系统启动时的参数和我们上面提到的一些判断条件,返回系统现在的状态。StorageDirectory可能处于以下的某一个状态(与系统的工作状态一定的对应):

 

 

NON_EXISTENT:指定的目录不存在;

NOT_FORMATTED:指定的目录存在但未被格式化;

 

COMPLETE_UPGRADE:previous.tmp存在,current也存在

RECOVER_UPGRADE:previous.tmp存在,current不存在

 

COMPLETE_FINALIZE:finalized.tmp存在,current也存在

 

 

 

COMPLETE_ROLLBACK:removed.tmp存在,current也存在,previous不存在

RECOVER_ROLLBACK:removed.tmp存在,current不存在,previous存在

 

COMPLETE_CHECKPOINT:lastcheckpoint.tmp存在,current也存在

RECOVER_CHECKPOINT:lastcheckpoint.tmp存在,current不存在

 

 

 

NORMAL:普通工作模式。

 

 

StorageDirectory处于某些状态是通过发生对应状态改变需要的工作文件夹和正常工作的current夹来进行判断。状态改

变需要的工作文件夹包括:

previous:用于升级后保存以前版本的文件

previous.tmp:用于升级过程中保存以前版本的文件

removed.tmp:用于回滚过程中保存文件

finalized.tmp:用于提交过程中保存文件

lastcheckpoint.tmp:应用于从NameNode中,导入一个检查点

previous.checkpoint:应用于从NameNode中,结束导入一个检查点

 

 

 


 

有了这些状态,就可以对系统进行恢复(通过方法doRecover)。恢复的动作如

下(结合上面的状态转移图):

 


 

COMPLETE_UPGRADE:mv previous.tmp -> previous

RECOVER_UPGRADE:mv previous.tmp -> current

COMPLETE_FINALIZE:rm finalized.tmp

COMPLETE_ROLLBACK:rm removed.tmp

RECOVER_ROLLBACK:mv removed.tmp -> current

COMPLETE_CHECKPOINT:mv lastcheckpoint.tmp ->

previous.checkpoint

RECOVER_CHECKPOINT:mv lastcheckpoint.tmp -> current

 

 

 

 

 

当升级某个集群的Hadoop的时候,正如任何软件的升级一样,可能会引入新的bug或者不兼容的修改导致现有的应用出现过去没有发现的问题。在所有重要的HDFS安装应用中,是不允许出现因丢失任何数据需要从零开始重启

HDFS的情况。HDFS允许管理员恢复到 Hadoop的早期版本,并且将集群的状态回滚到升级前。HDFS的升级细节请参考 upgrade wiki 。HDFS在任何时间只能有一个备份,因此在升级前,管理员需要通过'bin/hadoop dfsadmin

-finalizeUpgrade'命令移除现有的备份。下面简要描述了典型的升级过程:


1)在升级Hadoop前,如果已经存在备份,需要先结束(finalize)它。可以通过'dfsadmin  -upgradeProgress status'命令查询集群是否需要执行finalize

2)停止集群,分发部署新版本的Hadoop

3)执行新版本的hadoop,通过添加 -upgrade 选项,例如/bin/start-dfs.sh -upgrade

4)大多数情况下,集群在升级后可以正常运行。一旦新的HDFS在运行若干天的操作后没有出现问题,那么就可以结束(finalize)这次升级。请注意,在升级前删除的文件并不释放在datanode上的实际磁盘空间,直到集群被结(finalize)升级前。

5)如果有需要回到老版本的Hadoop,那么可以:

a)停止集群,分发部署老版本的Hadoop

b)通过rollback选项启动集群,例如bin/start-dfs.sh -rollback

 

  • 描述: 图片
  • 大小: 14.7 KB
  • 大小: 121.6 KB
  • 大小: 14.5 KB
  • 大小: 286.7 KB
分享到:
评论

相关推荐

    Hadoop源代码分析

    《Hadoop源代码分析》是一本深入探讨Hadoop核心组件MapReduce的专著。Hadoop是Apache软件基金会的一个开源项目,旨在提供分布式存储和计算框架,以处理和存储大量数据。MapReduce是Hadoop的核心计算模型,它通过将大...

    Hadoop源代码分析完整版.rar

    这份“Hadoop源代码分析完整版”文档深入剖析了Hadoop的内部工作机制,帮助我们深入了解这一开源框架的精髓。 1. **Hadoop概述** Hadoop是由Apache基金会开发的开源分布式计算框架,它主要由Hadoop Distributed ...

    Hadoop源代码分析(一零)

    让我们看看Hadoop源代码,以便于进行分析。这是第十部分,Word版

    Hadoop源代码分析(四零)

    Hadoop源代码分析的第四章节主要关注的是DFSOutputStream的内部工作原理。DFSOutputStream是Hadoop分布式文件系统(HDFS)中用于客户端写入数据的核心类。在这个部分,我们将深入探讨其构造函数、数据包的处理以及与...

    Hadoop源代码分析(二)

    在深入Hadoop源代码分析的第二部分中,我们将聚焦于Hadoop的核心组件和它们的功能,以便更好地理解这个分布式计算框架的内部运作。以下是对各关键包的详细解释: 1. **PackageDependences** 这个包包含了Hadoop中...

    Hadoop源代码分析(完整版).doc

    在深入分析Hadoop源代码之前,我们先了解一下Hadoop的核心组件和它们的作用。Hadoop是一个开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储海量数据。它主要由两个核心部分组成:HDFS(Hadoop ...

    hadoop 源代码归档 3

    【描述】:“Hadoop源代码归档3”指的是一个包含Hadoop开源项目源代码的压缩文件集合,可能是版本更新或特定开发阶段的代码备份。Hadoop是一个由Apache基金会开发的分布式计算框架,广泛应用于大数据处理和分析。这...

    hadoop源代码打包归档

    很抱歉,根据您给出的信息,"hadoop源代码打包归档"这个主题与提供的压缩包文件内容不匹配。压缩包内的文件看起来是法律讲座的资料,而非与Hadoop相关的源代码或技术文档。Hadoop是一个开源的大数据处理框架,通常...

    Hadoop源代码分析(三零)

    在Hadoop源代码分析的第三十章节中,主要探讨了HDFS(Hadoop Distributed File System)中的几个关键操作,包括`abandonBlock`、`addBlock`、`complete`等,这些都是与文件内容管理和写入流程密切相关的功能。...

    hadoop权威指南源代码

    总的来说,"tomwhite-hadoop-book-src"这个压缩包内的源代码是学习Hadoop理论知识与实践经验的宝贵资源。通过深入研究和实践,你可以更好地理解分布式计算的精髓,提升自己在大数据领域的技能。

    Hadoop源代码分析(二三)

    ### Hadoop源代码分析——FSDirectory深入解析 #### 一、引言 在深入了解Hadoop内部机制的过程中,源代码的分析是不可或缺的一环。本文将聚焦于Hadoop中的`FSDirectory`类,它是HDFS(Hadoop分布式文件系统)核心...

    hadoop源代码部分

    阅读Hadoop源代码有助于理解其内部工作原理,从而优化自己的分布式应用程序。你可以学习错误处理机制、并发控制、网络通信等编程技巧。同时,通过设置断点和使用调试工具,你可以深入到代码执行的每个步骤,提升...

    hadoop源代码归档 1

    很抱歉,根据您提供的信息,压缩包的文件名称列表似乎包含了一些与Hadoop源代码无关的文件,如壁纸分享和数字组合,这可能是一个误解。然而,鉴于标题和描述提及的是"Hadoop源代码归档 1",我们可以专注于讨论Hadoop...

    Hadoop源代码分析(MapTask辅助类 I)

    ### Hadoop MapTask辅助类源代码分析 #### 一、概述 Hadoop作为一个分布式计算框架,其核心组件之一是MapReduce。MapReduce负责处理大规模数据集的并行运算任务,而MapTask作为MapReduce的核心组成部分之一,其...

    Hadoop实战源代码(HadoopinAction_source_code)

    《Hadoop实战源代码》(HadoopinAction_source_code)是针对大数据处理框架Hadoop的一份珍贵资源,其中包含了从知名书籍《Hadoop in Action》官网获取的实际代码示例。这些示例涵盖了Hadoop的核心组件及其应用,为...

    Hadoop Real-World Solutions Cookbook 源代码

    《Hadoop Real-World Solutions Cookbook 源代码》是一本针对Hadoop实际应用问题解决方案的实战指南,书中通过丰富的示例代码帮助读者理解和解决在大数据处理中遇到的各种挑战。源代码包含了书中各个章节的关键实现...

    Hadoop实战.源代码

    通过研究这些源代码,学习者可以深入了解Hadoop的内在运作机制,提升处理大数据问题的能力。这些实战示例对于开发者来说极具价值,它们不仅提供了理论知识的验证,更能在实践中提高技能,帮助开发者解决实际工作中...

Global site tag (gtag.js) - Google Analytics