`
zhangjun5965
  • 浏览: 12317 次
社区版块
存档分类
最新评论

hadoop源码解析之hdfs写数据全流程分析---创建文件

阅读更多

 

 

http://zhangjun5965.iteye.com/admin/blogs/2386382

概述

hdfs中写数据应该是hdfs中最复杂的业务之一了,hadoop中的每个文件由多个block组成,每个块又有多个备份,这些备份又放在了不同的机器上,所以新建文件的时候会向namenode申请block所在的机器。

hdfs中每个block默认情况下是128M,由于每个块比较大,所以在写数据的过程中是把数据块拆分成一个个的数据包以管道的形式发送的,所以hdfs文件的写入会涉及到客户端、namenode、datanode多个模块的交互。我们基于hadoop-2.7.3的源码将这些流程一个个的分开来学习下。

DFSClient创建文件

先看一段最简单的hdfs api写文件的例子。

Configuration conf = new Configuration();  
FileSystem fs = FileSystem.get(conf);  
Path file = new Path("hdfs://127.0.0.1:9000/example.txt");  
FSDataOutputStream outStream = fs.create(file);  
out.write("java api write data".getBytes("UTF-8"));   
outStream.close();

通过 FileSystem.get(conf); 来构造了一个FileSystem 实例,这里对应的是DistributedFileSystem,通过调用DistributedFileSystem里面的create方法创建了一个文件,并且返回了这个文件的输出流,用于写入数据。

DistributedFileSystem的create方法有很多重载的方法,最终调用了DistributedFileSystem的下面的这个create方法

@Override
  public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet<CreateFlag> cflags, final int bufferSize,
    final short replication, final long blockSize, final Progressable progress,
    final ChecksumOpt checksumOpt) throws IOException {
    statistics.incrementWriteOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataOutputStream>() {
      @Override
      public FSDataOutputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                cflags, replication, blockSize, progress, bufferSize,
                checksumOpt);
        return dfs.createWrappedOutputStream(dfsos, statistics);
      }
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF);
  }

在这里,调用了DFSClient的create方法来创建文件

create(String, FsPermission, EnumSet<CreateFlag>, boolean, short, long, Progressable, int, ChecksumOpt, InetSocketAddress[])

在这里create方法里,通过DFSOutputStream的静态方法newStreamForCreate构建了一个对象,并且返回了一个DFSOutputStream对象。

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    TraceScope scope =
        dfsClient.getPathTraceScope("newStreamForCreate", src);
    try {
      HdfsFileStatus stat = null;

      // Retry the create if we get a RetryStartFileException up to a maximum
      // number of times
      boolean shouldRetry = true;
      int retryCount = CREATE_RETRY_COUNT;
      while (shouldRetry) {
        shouldRetry = false;
        try {
          //通过调用namenode的create方法来创建文件
          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
              new EnumSetWritable<CreateFlag>(flag), createParent, replication,
              blockSize, SUPPORTED_CRYPTO_VERSIONS);
          break;
        } catch (RemoteException re) {
          IOException e = re.unwrapRemoteException(
              AccessControlException.class,
              DSQuotaExceededException.class,
              FileAlreadyExistsException.class,
              FileNotFoundException.class,
              ParentNotDirectoryException.class,
              NSQuotaExceededException.class,
              RetryStartFileException.class,
              SafeModeException.class,
              UnresolvedPathException.class,
              SnapshotAccessControlException.class,
              UnknownCryptoProtocolVersionException.class);
          if (e instanceof RetryStartFileException) {
            if (retryCount > 0) {
              shouldRetry = true;
              retryCount--;
            } else {
              throw new IOException("Too many retries because of encryption" +
                  " zone operations", e);
            }
          } else {
            throw e;
          }
        }
      }
      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
      
      //构造了一个DFSOutputStream对象,即刚刚创建的文件的输出流.
      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
          flag, progress, checksum, favoredNodes);
          
     //start方法启动了DFSOutputStream的内部类DataStreamer,用于接收要写入的数据包
      out.start();
      return out;
    } finally {
      scope.close();
    }
  }

通过dfsClient.namenode.create在hdfs的目录树上创建了一个文件,然后通过new DFSOutputStream创建了一个该文件的输出流实例,在DFSOutputStream构造方法中,初始化了用于数据处理的DFSOutputStream类的内部类DataStreamer,用于启动DataStreamer线程,接受客户端写入数据包的请求。

DataStreamer是一个线程,它的启动是通过DFSOutputStream的start方法来启动的

/** Construct a new output stream for creating a file. */
  private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
      EnumSet<CreateFlag> flag, Progressable progress,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    this(dfsClient, src, progress, stat, checksum);
    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);

    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);

    streamer = new DataStreamer(stat, null);
    if (favoredNodes != null && favoredNodes.length != 0) {
      streamer.setFavoredNodes(favoredNodes);
    }
  }

namenode创建文件

上述dfsClient.namenode.create是调用了客户端和namenode交互的接口ClientProtocol中的create方法来创建文件,之后由ClientProtoco的实现类 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB中的create方法封装了创建文件所需的信息,通过rpc的方式发送到了namenode来处理。

最终的实现方法是NameNodeRpcServer类的create方法,之后经过FSNamesystem的startFile、startFileInt,最后在方法startFileInternal中实现具体的逻辑。

1.首先检查是否是一个目录,如果是的话抛出异常.
2.检查是否有写的权限。
3.检查是否创建父目录
4.检查create字段,用户是否创建为文件
5.检查是否覆盖源文件,如果true的话,则删除原来的旧文件。

最后调用了FSDirectory的addFile方法来创建文件。 iip = dir.addFile(parent.getKey(), parent.getValue(), permissions, replication, blockSize, holder, clientMachine);

具体的操作就是找到该文件的父目录,然后在父目录的List<INode>类型的对象children中添加一条数据。

具体的代码如下:

/**
  * 创建一个新文件或者覆盖一个已经存在的文件
  * Create a new file or overwrite an existing file<br>
  * 
  * Once the file is create the client then allocates a new block with the next
  * call using {@link ClientProtocol#addBlock}.
  * <p>
  * For description of parameters and exceptions thrown see
  * {@link ClientProtocol#create}
  */
 private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
     INodesInPath iip, PermissionStatus permissions, String holder,
     String clientMachine, boolean create, boolean overwrite, 
     boolean createParent, short replication, long blockSize, 
     boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
     EncryptedKeyVersion edek, boolean logRetryEntry)
     throws IOException {
   assert hasWriteLock();
   // Verify that the destination does not exist as a directory already.
   final INode inode = iip.getLastINode();
   final String src = iip.getPath();
   //检查是否存在,并且是不是一个目录
   if (inode != null && inode.isDirectory()) {
     throw new FileAlreadyExistsException(src +
         " already exists as a directory");
   }

   //检查是否有写的权限
   final INodeFile myFile = INodeFile.valueOf(inode, src, true);
   if (isPermissionEnabled) {
     if (overwrite && myFile != null) {
       dir.checkPathAccess(pc, iip, FsAction.WRITE);
     }
     /*
      * To overwrite existing file, need to check 'w' permission 
      * of parent (equals to ancestor in this case)
      */
     dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
   }
   //是否创建父目录
   if (!createParent) {
     dir.verifyParentDir(iip, src);
   }

   FileEncryptionInfo feInfo = null;

   final EncryptionZone zone = dir.getEZForPath(iip);
   if (zone != null) {
     // The path is now within an EZ, but we're missing encryption parameters
     if (suite == null || edek == null) {
       throw new RetryStartFileException();
     }
     // Path is within an EZ and we have provided encryption parameters.
     // Make sure that the generated EDEK matches the settings of the EZ.
     final String ezKeyName = zone.getKeyName();
     if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
       throw new RetryStartFileException();
     }
     feInfo = new FileEncryptionInfo(suite, version,
         edek.getEncryptedKeyVersion().getMaterial(),
         edek.getEncryptedKeyIv(),
         ezKeyName, edek.getEncryptionKeyVersionName());
   }

   try {
     BlocksMapUpdateInfo toRemoveBlocks = null;
     if (myFile == null) {
     //是否创建文件
       if (!create) {
         throw new FileNotFoundException("Can't overwrite non-existent " +
             src + " for client " + clientMachine);
       }
     } else {
        //是否覆盖
       if (overwrite) {
         toRemoveBlocks = new BlocksMapUpdateInfo();
         List<INode> toRemoveINodes = new ChunkedArrayList<INode>();
         //删除旧文件
         long ret = FSDirDeleteOp.delete(dir, iip, toRemoveBlocks,
                                         toRemoveINodes, now());
         if (ret >= 0) {
           iip = INodesInPath.replace(iip, iip.length() - 1, null);
           FSDirDeleteOp.incrDeletedFileCount(ret);
           removeLeasesAndINodes(src, toRemoveINodes, true);
         }
       } else {
         // If lease soft limit time is expired, recover the lease
         recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
             iip, src, holder, clientMachine, false);
         throw new FileAlreadyExistsException(src + " for client " +
             clientMachine + " already exists");
       }
     }

     checkFsObjectLimit();
     INodeFile newNode = null;

     // Always do an implicit mkdirs for parent directory tree.
     Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
         .createAncestorDirectories(dir, iip, permissions);
     if (parent != null) {
       //具体的操作,创建文件
       iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
           replication, blockSize, holder, clientMachine);
       newNode = iip != null ? iip.getLastINode().asFile() : null;
     }

     if (newNode == null) {
       throw new IOException("Unable to add " + src +  " to namespace");
     }
     leaseManager.addLease(newNode.getFileUnderConstructionFeature()
         .getClientName(), src);

     // Set encryption attributes if necessary
     if (feInfo != null) {
       dir.setFileEncryptionInfo(src, feInfo);
       newNode = dir.getInode(newNode.getId()).asFile();
     }

     setNewINodeStoragePolicy(newNode, iip, isLazyPersist);

     // record file record in log, record new generation stamp
     getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
     NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added {}" +
         " inode {} holder {}", src, newNode.getId(), holder);
     return toRemoveBlocks;
   } catch (IOException ie) {
     NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
         ie.getMessage());
     throw ie;
   }
 }
分享到:
评论

相关推荐

    Hadoop_2.X_HDFS源码剖析_带索引书签目录_徐鹏

    《Hadoop_2.X_HDFS源码剖析》是由徐鹏编著的一本深入解析Hadoop 2.x版本中HDFS(Hadoop Distributed File System)源码的专业书籍。这本书旨在帮助读者理解HDFS的核心机制,提升在分布式存储系统方面的专业技能。 ...

    高可用性的HDFS:Hadoop分布式文件系统深度实践

    5.5.2 创建文件情景分析 5.6 AvatarNode Standby故障切换过程 5.7 元数据一致性保证机制 5.7.1 元数据目录树信息 5.7.2 Data Node与Block数据块映射信息 5.8 Block更新同步问题 5.8.1 问题描述 5.8.2 结论 5.8.3 ...

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    hadoop源码.zip

    本篇文章将围绕"Hadoop源码"这一主题,深度探讨HDFS的底层实现机制,帮助读者从源码层面理解其工作原理。 一、HDFS概述 HDFS是基于Google发表的GFS论文设计的分布式文件系统,旨在处理和存储大量数据。它采用了主从...

    java操作hadoop之mapreduce分析年气象数据最低温度实战源码

    通过这个实战项目,学习者不仅可以掌握Java操作Hadoop MapReduce的基本方法,还能深入了解大数据处理流程,以及如何从海量气象数据中提取有价值的信息。此外,对于提升数据处理能力和分布式计算的理解也大有裨益。...

    Hadoop_HDFS安装和管理.pdf

    - **Namenode**:作为HDFS的核心组件之一,Namenode负责管理文件系统的命名空间以及客户端的文件访问元数据。文档提到使用一台主机作为主Namenode (`ost2`),并通过热备方案配置了另一台从Namenode (`ost3`)。 - *...

    MySql准实时同步数据到HDFS(单机版).docx

    - 创建Flume配置文件,定义从Kafka消费数据并写入HDFS的agent。 - 启动Flume agent。 7. **Hadoop**: - 安装Hadoop 3.3.0,配置HDFS相关参数。 - 初始化HDFS,启动NameNode和DataNode。 ### **数据同步流程** ...

    hadoop源码分析

    《深入剖析Hadoop源码:理解其核心流程与机制》 Hadoop,作为大数据处理领域的重要框架,其源码分析对于开发者来说具有极高的价值。本文将深入探讨Hadoop的核心组件,包括Configuration、JobClient、JobConf以及...

    hadoop源代码存档

    4. MapReduce工作流程:分析job.xml配置文件,理解JobTracker如何解析和调度任务,TaskTracker如何执行任务,以及shuffle和sort过程的实现。 5. 容错机制:探究Hadoop如何实现硬件故障的自动检测和数据恢复,如心跳...

    hadoop2x-eclipse-plugin-original

    5. **Hadoop-Eclipse插件功能**:该插件提供HDFS的浏览和文件操作功能,可以在Eclipse内创建、编辑和运行MapReduce程序,同时还可以直接提交作业到Hadoop集群进行测试和生产运行。 6. **开发环境集成**:通过...

    java操作hadoop之mapreduce计算整数的最大值和最小值实战源码

    在大数据处理领域,Hadoop是不可或缺的一个开源框架,它提供了分布式存储(HDFS)和分布式计算(MapReduce)的能力。本教程将详细讲解如何使用Java编程语言操作Hadoop的MapReduce来计算整数序列中的最大值和最小值,...

    hadoop简单示例源码

    在这个阶段,mapper将读取数据,解析日期和温度,然后为每一年的气温创建键值对(年份作为键,当年的最高气温作为值)。 3. **排序与分区**:经过map阶段后,数据会被按照键(年份)进行排序,并分配到不同的reduce...

    hadoop-2.9.2.tar.gz

    启动Hadoop服务前,确保HDFS的数据目录已创建且拥有正确权限。运行初始化命令`hadoop namenode -format`,然后启动DataNode、NameNode、ResourceManager、NodeManager等服务。测试Hadoop是否正常运行,可以使用`...

    protobuf-hadoop

    Hadoop则是大数据处理的基石,它提供了一个分布式文件系统(HDFS)和MapReduce计算模型,用于处理和存储大量数据。 在Hadoop生态系统中,数据通常以文本格式(如CSV或JSON)存储,这在某些情况下可能会导致较高的...

    基于hadoop-2.6.0-cdh5.4.3版本的源码阅读,以注释及博客的形式记录阅读笔记.zip

    《Hadoop 2.6.0-cdh5.4.3 源码解析与学习笔记》 在当今大数据处理领域,Hadoop 是一个至关重要的开源框架,它为大规模数据处理提供了分布式计算的能力。本资料主要针对 Hadoop 2.6.0-cdh5.4.3 版本进行源码阅读,...

    HadoopDemo_hadoopDemo_nationhb8_hadoop_源码

    HDFS是Hadoop的核心组件之一,它是一种分布式文件系统,能够将大量数据存储在由多台普通服务器组成的集群上。通过Java API,我们可以实现对HDFS的基本操作,如文件的创建、读取、写入和删除。在`HadoopDemo`项目中,...

    hadoop配置

    文档《Hadoop源代码分析完整版》、《Hadoop0.20.0源码流程分析》和《Hadoop源码的入门解析》将深入讲解Hadoop的内部机制,包括数据读写流程、任务调度和资源管理等。 五、Linux操作命令 在Hadoop环境中,经常需要...

    cloudera-hive-cdh6.3.2源码包

    Hive 是一个基于 Hadoop 的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供简单的 SQL 查询功能,用来进行数据分析。在 CDH(Cloudera Distribution Including Apache Hadoop)6.3.2 版本中,...

Global site tag (gtag.js) - Google Analytics