好奇分布式存储是怎么实现的,如何能将一个文件存储到HDFS上,HDFS的文件目录只是一个空壳,真正存储数据的是DataNode,那么当我们把一个文件放到HDFS上的时候,集群都做了哪些工作呢 ?也就是执行命令copyFromLocal这个命令都做了哪些操作
首先命令肯定对应着源码里面的某一个方法,这个方法是FsShell类的copyFromLocal,代码:
void copyFromLocal(Path[] srcs, String dstf) throws IOException { // 创建目标路径 Path dstPath = new Path(dstf); // 获取目录存储目标路径的文件系统 FileSystem dstFs = dstPath.getFileSystem(getConf()); if (srcs.length == 1 && srcs[0].toString().equals("-")) { copyFromStdin(dstPath, dstFs); } else { dstFs.copyFromLocalFile(false, false, srcs, dstPath); } }
文件的拷贝是通过类FileUtil累的copy方法实现的:
public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { // 检查目标路径是否合法 dst = checkDest(src.getName(), dstFS, dst, overwrite); if (srcFS.getFileStatus(src).isDir()) { // 检查目标目录是否是合理的目录 checkDependencies(srcFS, src, dstFS, dst); if (!dstFS.mkdirs(dst)) { return false; } FileStatus contents[] = srcFS.listStatus(src); for (int i = 0; i < contents.length; i++) { // 递归调用当前方法,如果原目标是文件,那么执行else if 代码块 copy(srcFS, contents[i].getPath(), dstFS, new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf); } } else if (srcFS.isFile(src)) { InputStream in = null; OutputStream out = null; try { in = srcFS.open(src); // 创建目标路径,在分布式中如何创建很重要 out = dstFS.create(dst, overwrite); IOUtils.copyBytes(in, out, conf, true); } catch (IOException e) { IOUtils.closeStream(out); IOUtils.closeStream(in); throw e; } } }
文件的拷贝需要打开源文件流和目标文件流,目标文件流是通过DFSClient的create方法实现,创建一个DFSOutputStream流:
public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getDefault(); } FsPermission masked = permission .applyUMask(FsPermission.getUMask(conf)); LOG.debug(src + ": masked=" + masked); // src 为要拷贝到的目标路径, 文件块大小blockSize应该是io.bytes.per.checksum // 大小的n倍,否则会出现异常 OutputStream result = new DFSOutputStream(src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); leasechecker.put(src, result); return result; }
在创建DFSOutputStream流的时候都做了什么工作,具体看创建方法,在DFSOutputStream中开启了DataStreamer进程,这个进程在后面的数据写入的时候扮演者重要的角色:
DFSOutputStream(String src, FsPermission masked, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum, replication); computePacketChunkSize(writePacketSize, bytesPerChecksum); try { namenode.create(src, masked, clientName, overwrite, createParent, replication, blockSize); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileAlreadyExistsException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } streamer.start(); }
DataStreamer进程起来以后,开启与目标文件的通道,等待DataQueue队列有数据后,将数据写入到目标文件中,目标文件其实是DataNode上的文件,熟称block,关于如何寻找相应的block,可以从上面的另一条主线,创建文件的源码中查看。
...... // get packet to be sent. if (dataQueue.isEmpty()) { one = new Packet(); // heartbeat packet } else { // 从队列中获取一个 Packet one = dataQueue.getFirst(); // regular data // packet } ...... // 如果某一块的数据已经读取完,开启下一个块的连接 // if (blockStream == null) { LOG.debug("Allocating new block"); nodes = nextBlockOutputStream(src); this.setName("DataStreamer for file " + src + " block " + block); response = new ResponseProcessor(nodes); response.start(); } ...... // blockStream向clinet(也就是某个DataNode)发送数据 blockStream.write(buf.array(), buf.position(), buf.remaining());
这个进程会等待数据的来临,那么数据从何而来,看IOUtils的copyBytes方法,它判断是否是PrintStream流,这个用于打印到控制台:
public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); while (bytesRead >= 0) { // 这个另有乾坤,不要简单的把out想象成OutputStream // 这个out方法最终会调用DFSClient.DFSOutputStream.writeChunk(..) out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); } }
这个out从刚才的原来看应该是FSDataOutputStream,追踪write方法,会到FSOutputSummer类的writeChecksumChunk方法中:
private void writeChecksumChunk(byte b[], int off, int len, boolean keep) throws IOException { int tempChecksum = (int) sum.getValue(); if (!keep) { sum.reset(); } int2byte(tempChecksum, checksum); writeChunk(b, off, len, checksum); }
这个方法通过调用自身的抽象方法writeChunk方法来实现写数据,这个抽象的方法由DFSOutputStream实现,在writeChunk方法中将源文件的数据装载到DataQueue中,这样原先的DataStreamer进程就可以从DataQueue中读取数据并写如到指定的block中,具体可以看代码的实现。
private synchronized void enqueueCurrentPacket() { synchronized (dataQueue) { if (currentPacket == null) return; dataQueue.addLast(currentPacket); dataQueue.notifyAll(); lastQueuedSeqno = currentPacket.seqno; currentPacket = null; } }
相关推荐
Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...
Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...
Hadoop 源码解析 - DataNode Hadoop 作为一个大数据处理框架,其核心组件之一是分布式文件系统(HDFS),而 DataNode 是 HDFS 中的重要组件之一。DataNode 负责存储和管理数据块,提供数据访问服务。本文将对 ...
Hadoop 源代码分析 Hadoop 是一个开源的分布式计算框架,由 Apache 基金会维护。Hadoop 的核心组件包括 HDFS(Hadoop Distributed File System)和 MapReduce。HDFS 是一个分布式文件系统,可以存储大量的数据,而 ...
- 源码解析:深入Hadoop源码,研究如NameNode、DataNode、MapTask和ReduceTask等关键类的功能实现。 - 故障恢复和容错机制:探讨Hadoop如何处理硬件故障,保持数据完整性。 - 性能调优:学习如何通过调整参数和...
这份源码解析深入探讨了这一过程,让我们逐一解析其主要环节。 首先,Job提交流程始于`waitForCompletion()`方法调用,接着`submit()`建立与集群的连接。`Cluster`对象被用来确定Job是在本地运行还是在YARN集群中...
#### 二、Hadoop架构解析 - **Hadoop生态系统**:Hadoop不仅包括HDFS和MapReduce两大核心组件,还包括了其他多个重要的子项目和技术,形成了一个完整的生态系统。 - **包结构及依赖**:Hadoop的包结构非常复杂,这...
在对Hadoop源码进行编译的过程中,确保正确地安装和配置所有必要的软件包是至关重要的。以下将详细阐述这些软件包的作用以及如何在编译Hadoop源码时使用它们。 1. **protobuf (Protocol Buffers)** Protocol ...
本文将深入探讨Hadoop的源码,帮助你理解其核心机制,提升你在大数据处理领域的专业技能。 首先,我们要了解Hadoop的两个核心组件:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一种分布式文件系统...
【标题】"实战hadoop中的源码"涵盖了在大数据处理领域深入理解并应用Apache Hadoop的核心技术。Hadoop是开源的分布式计算框架,它允许在大规模集群上存储和处理海量数据。通过研究Hadoop的源码,开发者可以深入了解...
本文将深入探讨在编译Hadoop CDH源码时所需的软件及其重要性。 首先,我们来看“google-snappy-ea660b5”。Snappy是由Google开发的一个高效的数据压缩库,它主要关注的是高速度而非最高压缩率。在Hadoop中,Snappy...
本文将详细介绍如何编译和安装Hadoop源码,确保你能够按照提供的步骤顺利进行。 首先,我们需要了解Hadoop的基本概念。Hadoop是由Apache软件基金会开发的,它基于Java语言,实现了MapReduce编程模型和分布式文件...
2. **Hadoop源码获取**:从Apache Hadoop的官方Git仓库(如GitHub或Apache Git)克隆源码,或者下载源码的zip/tarball文件。解压到本地的开发目录。 3. **Maven的POM.xml**:Hadoop项目中的`pom.xml`文件是Maven的...
很抱歉,根据您提供的文件信息,"hadoop源码归档.zip"的描述中并没有包含任何与Hadoop源码相关的具体知识点。标签虽然指出了"Hadoop",但压缩包内的文件名称列表却与Hadoop或者IT技术无关,而是包含了各种文化和法律...
《深入剖析Hadoop 2.8.1源码:分布式系统的智慧结晶》 Hadoop,作为开源的大数据处理框架,自2006年诞生以来,一直是大数据领域的重要支柱。其2.8.1版本是Hadoop发展的一个关键节点,为用户提供了更稳定、高效的...
Hadoop的构建过程依赖于Maven来解析依赖关系、编译源代码、运行测试、打包和部署项目。Maven通过其POM(Project Object Model)文件来管理项目的构建配置。安装Maven后,开发者可以在命令行中执行`mvn compile`、`...
本篇文章将详细探讨Hadoop源码的相关知识,以及如何获取和解析Hadoop的源代码。 Hadoop的核心组件主要包括两个主要部分:Hadoop Distributed File System (HDFS) 和 MapReduce。HDFS是一种分布式文件系统,用于存储...