`
zhangbaoming815
  • 浏览: 149381 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop源码解析copyFromLocal

阅读更多

好奇分布式存储是怎么实现的,如何能将一个文件存储到HDFS上,HDFS的文件目录只是一个空壳,真正存储数据的是DataNode,那么当我们把一个文件放到HDFS上的时候,集群都做了哪些工作呢 ?也就是执行命令copyFromLocal这个命令都做了哪些操作

首先命令肯定对应着源码里面的某一个方法,这个方法是FsShell类的copyFromLocal,代码:

    void copyFromLocal(Path[] srcs, String dstf) throws IOException
    {
        // 创建目标路径
        Path dstPath = new Path(dstf);
        
        // 获取目录存储目标路径的文件系统
        FileSystem dstFs = dstPath.getFileSystem(getConf());
        
        if (srcs.length == 1 && srcs[0].toString().equals("-"))
        {
            copyFromStdin(dstPath, dstFs);
        }
        else
        {
            
            dstFs.copyFromLocalFile(false, false, srcs, dstPath);
        }
    }

 文件的拷贝是通过类FileUtil累的copy方法实现的:

    public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS,
            Path dst, boolean deleteSource, boolean overwrite,
            Configuration conf) throws IOException
    {
        // 检查目标路径是否合法
        dst = checkDest(src.getName(), dstFS, dst, overwrite);

        if (srcFS.getFileStatus(src).isDir())
        {
            // 检查目标目录是否是合理的目录
            checkDependencies(srcFS, src, dstFS, dst);
            if (!dstFS.mkdirs(dst))
            {
                return false;
            }
            FileStatus contents[] = srcFS.listStatus(src);
            for (int i = 0; i < contents.length; i++)
            {
                // 递归调用当前方法,如果原目标是文件,那么执行else if 代码块
                copy(srcFS, contents[i].getPath(), dstFS, new Path(dst,
                        contents[i].getPath().getName()), deleteSource,
                        overwrite, conf);
            }
        }
        else if (srcFS.isFile(src))
        {
            InputStream in = null;
            OutputStream out = null;
            try
            {
                in = srcFS.open(src);
                
                // 创建目标路径,在分布式中如何创建很重要
                out = dstFS.create(dst, overwrite);
                
                IOUtils.copyBytes(in, out, conf, true);
            }
            catch (IOException e)
            {
                IOUtils.closeStream(out);
                IOUtils.closeStream(in);
                throw e;
            }
        }
     }

 文件的拷贝需要打开源文件流和目标文件流,目标文件流是通过DFSClient的create方法实现,创建一个DFSOutputStream流:

    public OutputStream create(String src, FsPermission permission,
            boolean overwrite, boolean createParent, short replication,
            long blockSize, Progressable progress, int buffersize)
            throws IOException
    {
        checkOpen();
        if (permission == null)
        {
            permission = FsPermission.getDefault();
        }
        FsPermission masked = permission
                .applyUMask(FsPermission.getUMask(conf));
        LOG.debug(src + ": masked=" + masked);
        
        // src 为要拷贝到的目标路径, 文件块大小blockSize应该是io.bytes.per.checksum
        // 大小的n倍,否则会出现异常
        OutputStream result = new DFSOutputStream(src, masked, overwrite,
                createParent, replication, blockSize, progress, buffersize,
                conf.getInt("io.bytes.per.checksum", 512));
        leasechecker.put(src, result);
        return result;
    }

 在创建DFSOutputStream流的时候都做了什么工作,具体看创建方法,在DFSOutputStream中开启了DataStreamer进程,这个进程在后面的数据写入的时候扮演者重要的角色:

        DFSOutputStream(String src, FsPermission masked, boolean overwrite,
                boolean createParent, short replication, long blockSize,
                Progressable progress, int buffersize, int bytesPerChecksum)
                throws IOException
        {
            this(src, blockSize, progress, bytesPerChecksum, replication);

            computePacketChunkSize(writePacketSize, bytesPerChecksum);

            try
            {
                namenode.create(src, masked, clientName, overwrite,
                        createParent, replication, blockSize);
            }
            catch (RemoteException re)
            {
                throw re.unwrapRemoteException(AccessControlException.class,
                        FileAlreadyExistsException.class,
                        FileNotFoundException.class,
                        NSQuotaExceededException.class,
                        DSQuotaExceededException.class);
            }
            streamer.start();
        }

 DataStreamer进程起来以后,开启与目标文件的通道,等待DataQueue队列有数据后,将数据写入到目标文件中,目标文件其实是DataNode上的文件,熟称block,关于如何寻找相应的block,可以从上面的另一条主线,创建文件的源码中查看。

......

// get packet to be sent.
if (dataQueue.isEmpty())
{        
       one = new Packet(); // heartbeat packet
}
 else
{
        // 从队列中获取一个 Packet
        one = dataQueue.getFirst(); // regular data
                                                            // packet
}

......

// 如果某一块的数据已经读取完,开启下一个块的连接
// 
if (blockStream == null)
{
          LOG.debug("Allocating new block");
          nodes = nextBlockOutputStream(src);
          this.setName("DataStreamer for file " + src
                     + " block " + block);
           response = new ResponseProcessor(nodes);
           response.start();
}

......
// blockStream向clinet(也就是某个DataNode)发送数据
blockStream.write(buf.array(), buf.position(),
                 buf.remaining());

 这个进程会等待数据的来临,那么数据从何而来,看IOUtils的copyBytes方法,它判断是否是PrintStream流,这个用于打印到控制台:

    public static void copyBytes(InputStream in, OutputStream out, int buffSize)
            throws IOException
    {

        PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
        byte buf[] = new byte[buffSize];
        int bytesRead = in.read(buf);
        while (bytesRead >= 0)
        {
            // 这个另有乾坤,不要简单的把out想象成OutputStream
            // 这个out方法最终会调用DFSClient.DFSOutputStream.writeChunk(..)
            out.write(buf, 0, bytesRead);
            if ((ps != null) && ps.checkError())
            {
                throw new IOException("Unable to write to output stream.");
            }
            bytesRead = in.read(buf);
        }
    }

 这个out从刚才的原来看应该是FSDataOutputStream,追踪write方法,会到FSOutputSummer类的writeChecksumChunk方法中:

    private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
            throws IOException
    {
        int tempChecksum = (int) sum.getValue();
        if (!keep)
        {
            sum.reset();
        }
        int2byte(tempChecksum, checksum);
        writeChunk(b, off, len, checksum);
    }

 这个方法通过调用自身的抽象方法writeChunk方法来实现写数据,这个抽象的方法由DFSOutputStream实现,在writeChunk方法中将源文件的数据装载到DataQueue中,这样原先的DataStreamer进程就可以从DataQueue中读取数据并写如到指定的block中,具体可以看代码的实现。

        private synchronized void enqueueCurrentPacket()
        {
            synchronized (dataQueue)
            {
                if (currentPacket == null)
                    return;
                dataQueue.addLast(currentPacket);
                dataQueue.notifyAll();
                lastQueuedSeqno = currentPacket.seqno;
                currentPacket = null;
            }
        }

 

分享到:
评论

相关推荐

    hadoop 源码解析_yarn源码解析

    Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...

    Hadoop源码分析(完整版)

    Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...

    hadoop 源码解析-DataNode

    Hadoop 源码解析 - DataNode Hadoop 作为一个大数据处理框架,其核心组件之一是分布式文件系统(HDFS),而 DataNode 是 HDFS 中的重要组件之一。DataNode 负责存储和管理数据块,提供数据访问服务。本文将对 ...

    Hadoop源代码分析(完整版).pdf

    Hadoop 源代码分析 Hadoop 是一个开源的分布式计算框架,由 Apache 基金会维护。Hadoop 的核心组件包括 HDFS(Hadoop Distributed File System)和 MapReduce。HDFS 是一个分布式文件系统,可以存储大量的数据,而 ...

    Hadoop源码分析视频下载

    - 源码解析:深入Hadoop源码,研究如NameNode、DataNode、MapTask和ReduceTask等关键类的功能实现。 - 故障恢复和容错机制:探讨Hadoop如何处理硬件故障,保持数据完整性。 - 性能调优:学习如何通过调整参数和...

    hadoop源码解析-Job提交.pdf

    这份源码解析深入探讨了这一过程,让我们逐一解析其主要环节。 首先,Job提交流程始于`waitForCompletion()`方法调用,接着`submit()`建立与集群的连接。`Cluster`对象被用来确定Job是在本地运行还是在YARN集群中...

    Hadoop源码分析 完整版 共55章

    #### 二、Hadoop架构解析 - **Hadoop生态系统**:Hadoop不仅包括HDFS和MapReduce两大核心组件,还包括了其他多个重要的子项目和技术,形成了一个完整的生态系统。 - **包结构及依赖**:Hadoop的包结构非常复杂,这...

    hadoop源码编译所需软件包

    在对Hadoop源码进行编译的过程中,确保正确地安装和配置所有必要的软件包是至关重要的。以下将详细阐述这些软件包的作用以及如何在编译Hadoop源码时使用它们。 1. **protobuf (Protocol Buffers)** Protocol ...

    Hadoop源码分析完整版

    本文将深入探讨Hadoop的源码,帮助你理解其核心机制,提升你在大数据处理领域的专业技能。 首先,我们要了解Hadoop的两个核心组件:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一种分布式文件系统...

    实战hadoop中的源码

    【标题】"实战hadoop中的源码"涵盖了在大数据处理领域深入理解并应用Apache Hadoop的核心技术。Hadoop是开源的分布式计算框架,它允许在大规模集群上存储和处理海量数据。通过研究Hadoop的源码,开发者可以深入了解...

    Hadoop 源码编译所需软件

    本文将深入探讨在编译Hadoop CDH源码时所需的软件及其重要性。 首先,我们来看“google-snappy-ea660b5”。Snappy是由Google开发的一个高效的数据压缩库,它主要关注的是高速度而非最高压缩率。在Hadoop中,Snappy...

    hadoop源码编译安装包及安装步骤

    本文将详细介绍如何编译和安装Hadoop源码,确保你能够按照提供的步骤顺利进行。 首先,我们需要了解Hadoop的基本概念。Hadoop是由Apache软件基金会开发的,它基于Java语言,实现了MapReduce编程模型和分布式文件...

    编译Hadoop源码需要的maven文件

    2. **Hadoop源码获取**:从Apache Hadoop的官方Git仓库(如GitHub或Apache Git)克隆源码,或者下载源码的zip/tarball文件。解压到本地的开发目录。 3. **Maven的POM.xml**:Hadoop项目中的`pom.xml`文件是Maven的...

    hadoop源码归档.zip

    很抱歉,根据您提供的文件信息,"hadoop源码归档.zip"的描述中并没有包含任何与Hadoop源码相关的具体知识点。标签虽然指出了"Hadoop",但压缩包内的文件名称列表却与Hadoop或者IT技术无关,而是包含了各种文化和法律...

    hadoop-2.8.1源码

    《深入剖析Hadoop 2.8.1源码:分布式系统的智慧结晶》 Hadoop,作为开源的大数据处理框架,自2006年诞生以来,一直是大数据领域的重要支柱。其2.8.1版本是Hadoop发展的一个关键节点,为用户提供了更稳定、高效的...

    Hadoop源码编译需要工具

    Hadoop的构建过程依赖于Maven来解析依赖关系、编译源代码、运行测试、打包和部署项目。Maven通过其POM(Project Object Model)文件来管理项目的构建配置。安装Maven后,开发者可以在命令行中执行`mvn compile`、`...

    hadoop源码下载

    本篇文章将详细探讨Hadoop源码的相关知识,以及如何获取和解析Hadoop的源代码。 Hadoop的核心组件主要包括两个主要部分:Hadoop Distributed File System (HDFS) 和 MapReduce。HDFS是一种分布式文件系统,用于存储...

Global site tag (gtag.js) - Google Analytics