当我们用命令:
hadoop fs -copyFromLocal localfile hdfs://...
将本地文件复制到HDFS时,其背后的复制过程是怎样的?本地文件通过什么方式传输到datanode上的呢?
这里面很显然的是:
1、文件在多个电脑之间进行了传输(至少有2台电脑:本地电脑和一个datanode节点)。
2、如果文件超过一个block的大小(默认是64M),那么将一个文件分割成多个block是在哪里发生的?
带着这些疑问,我们来解读一下源代码。
一、找到“幕后英雄”
通过简单的跟踪,就会发现这一功能是由
FileSystem类的copyFromLocalFile方法完成的。
继续跟踪,会发现拷贝其实是在2个文件系统中进行的,下面是
FileUtil类的一段代码:
/** Copy files between FileSystems. */
public static boolean copy(FileSystem srcFS, Path src,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
... // 为了突出重要代码,这里省略了部分代码
InputStream in=null;
OutputStream out = null;
try {
in = srcFS.open(src);
out = dstFS.create(dst, overwrite);
IOUtils.copyBytes(in, out, conf, true);
} catch (IOException e) {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
throw e;
}
...
}
copyFromLocalFile的实质是将文件从
LocalFileSystem复制到DistributedFileSystem
从上面的代码可以看出,复制的关键是:
1、获得本地文件系统的输入流(用来读取本地文件)
2、获得HDFS的输出流(用来向HDFS写入数据)
3、从第一流读取数据,写入第二个流。
从LocalFileSystem获取输入流很简单。问题是,如何获取DistributedFileSystem的输出流呢?
继续读代码,会发现:
DistributedFileSystem借助了DFSClient类,来实现客户端与HDFS之间文件的传输任务。
在DFSClient类中,创建流的是这一句:
OutputStream result = new DFSOutputStream(src, masked,
overwrite, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
所以,所有的奥秘就应该在类DFSOutputStream中了。
二、解读DFSOutputStream
DFSOutputStream负责将数据传输到HDFS中,既然数据是在本地读取的,又要保存在另外一台机器(datanode)上,所以这里面一定会涉及到Socket。
通过阅读DFSOutputStream源码,果然发现了DFSOutputStream对底层的socket通信进行的包装的细节,先说说DFSOutputStream中的几个变量:
private Socket s; // 与datanode之间建立的socket连接
private DataOutputStream blockStream; // socket的输出流(client->datanode),用于将数据传输给datanode
private DataInputStream blockReplyStream; // socket的输入流(datanode->client),用户收到datanode的确认包
除了socket和流以外,DFSOutputStream还有2个队列和2个线程:
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
// dataQueue是数据队列,用于保存等待发送给datanode的数据包
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
// ackQueue是确认队列,保存还没有被datanode确认接收的数据包
...
private DataStreamer streamer = new DataStreamer();;
// streamer线程,不停的从dataQueue中取出数据包,发送给datanode
private ResponseProcessor response = null;
// response线程,用于接收从datanode返回的反馈信息
所以,在向DFSOutputStream中,写入数据(通常是byte数组)的时候,实际的传输过程是:
1、byte[]被封装成64KB的Packet,然后扔进dataQueue中
2、DataStreamer线程不断的从dataQueue中取出Packet,通过socket发送给datanode(向blockStream写数据)
发送前,将当前的Packet从dataQueue中移除,并addLast进ackQueue
3、ResponseProcessor线程从blockReplyStream中读出从datanode的反馈信息
反馈信息很简单,就是一个seqno,再加上每个datanode返回的标志(成功标志为DataTransferProtocol.OP_STATUS_SUCCESS)
通过判断seqno(序列号,每个Packet有一个序列号),判断datanode是否接收到正确的包。
只有收到反馈包中的seqno与ackQueue.getFirst()的包seqno相同时,说明正确。否则可能出现了丢包的情况。
如果一切OK,则从ackQueue中移出:ackQueue.removeFirst(); 说明这个Packet被datanode成功接收了。
三、datanode端是怎么接收数据的?
上面分析的代码都位于客户端,那么datanode端的代码又如何呢?
答案是:
在DataNode端,有一个Daemon线程:dataXceiverServer,它有一个用于数据传输的ServerSocket一直开在那里。
每当有client连接到datanode时,datanode会new一个DataXceiver
DataXceiver负责数据的传输工作。
如果是写操作(client->datanode),则调用writeBlock方法:
case DataTransferProtocol.OP_WRITE_BLOCK:
writeBlock( in );
writeBlock方法负责:将数据写入本地磁盘,并负责将数据传输给其他datanode,保证数据的拷贝数目(可以通过dfs.replication设置)。
具体负责数据接收的是这一行:
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets.length);
几个参数的含义:
DataOutputStream mirrOut, // output to next datanode
// 下一个datanode的输出流
DataInputStream mirrIn, // input from next datanode
// 下一个datanode的输入流
DataOutputStream replyOut, // output to previous datanode
// 数据来源节点(可能是最初的client)的输出流
// 用来发送反馈通知包
String mirrAddr, BlockTransferThrottler throttlerArg,
int numTargets) throws IOException {
在receiveBlock方法中,循环接收数据:
/*
* Receive until packet length is zero.
*/
while (receivePacket() > 0) {}
在receivePacket方法中:
不断地从输入流中读取Packet数据:
int payloadLen = readNextPacket();
并将数据传输至下一个datanode节点:
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
mirrorOut.flush();
写入磁盘:
out.write(pktBuf, dataOff, len);
四、如果一个文件超过1个block大小,怎么重定向到新的datanode的?在哪里分割的(file分割成blocks)?
答案是:在DFSOutputStream类的writeChunk方法里。
line 3043:
if (bytesCurBlock == blockSize) { // 问题是:它们能正好相等吗?万一bytesCurBlock > blockSize了怎么办?
currentPacket.lastPacketInBlock = true;
bytesCurBlock = 0;
lastFlushOffset = -1;
}
再往下几行:
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
就是说,在new每个新的Packet之前,都会重新计算一下新的Packet的大小,
以保证新的Packet大小不会超过Block的剩余大小
如果block还有不到一个Packet的大小(比如还剩3kb的空间),则最后一个Packet的大小就是:
blockSize-bytesCurBlock,也就是3kb
line 2285:
// get new block from namenode.
if (blockStream == null) {
LOG.debug("Allocating new block");
nodes = nextBlockOutputStream(src);
this.setName("DataStreamer for file " + src +
" block " + block);
response = new ResponseProcessor(nodes);
response.start();
}
在DataStreamer中,如果遇到one.lastPacketInBlock==true,则将blockStream设为null,之后会重新写入新的block。
**THE END**
分享到:
相关推荐
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是这个框架的一个稳定版本,它包含了多个改进和优化,以提高性能和稳定性。在这个版本中,Winutils.exe和hadoop.dll是两...
Hadoop是一个开源的分布式计算框架,由Apache基金会开发,它主要设计用于处理和存储大量数据。在提供的信息中,我们关注的是"Hadoop的dll文件",这是一个动态链接库(DLL)文件,通常在Windows操作系统中使用,用于...
在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储和计算的能力。本文将详细探讨与"Hadoop.dll"和"winutils.exe"相关的知识点,以及它们在Hadoop-2.7.1版本中的作用。 Hadoop.dll是Hadoop在...
Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在普通硬件上高效处理大量数据。在Windows环境下,Hadoop的使用与Linux有所不同,因为它的设计最初是针对Linux操作系统的。"winutils"和"hadoop.dll...
在Hadoop生态系统中,`hadoop.dll`和`winutils.exe`是两个关键组件,尤其对于Windows用户来说,它们在本地开发和运行Hadoop相关应用时必不可少。`hadoop.dll`是一个动态链接库文件,主要用于在Windows环境中提供...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是Hadoop发展中的一个重要版本,它包含了众多的优化和改进,旨在提高性能、稳定性和易用性。在这个版本中,`hadoop.dll`...
在Hadoop生态系统中,Hadoop 2.7.7是一个重要的版本,它为大数据处理提供了稳定性和性能优化。Hadoop通常被用作Linux环境下的分布式计算框架,但有时开发者或学习者在Windows环境下也需要进行Hadoop相关的开发和测试...
在Hadoop生态系统中,`hadoop.dll`和`winutils.exe`是两个关键组件,尤其对于Windows用户来说。本文将详细介绍这两个文件以及它们在Hadoop 2.6.0版本中的作用。 `hadoop.dll`是Hadoop在Windows环境下运行所必需的一...
Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo 的工程师 Doug Cutting 和 Mike Cafarella Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo...
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...
标题 "hadoop2.6 hadoop.dll+winutils.exe" 提到的是Hadoop 2.6版本中的两个关键组件:`hadoop.dll` 和 `winutils.exe`,这两个组件对于在Windows环境中配置和运行Hadoop至关重要。Hadoop原本是为Linux环境设计的,...
在Windows环境下安装Hadoop 3.1.0是学习和使用大数据处理技术的重要步骤。Hadoop是一个开源框架,主要用于分布式存储和处理大规模数据集。在这个过程中,我们将详细讲解Hadoop 3.1.0在Windows上的安装过程以及相关...
Hadoop是一个由Apache软件基金会开发的开源框架,它允许使用简单的编程模型在分布式环境中存储和处理大数据。它主要由四个核心组件构成:Hadoop Common、HDFS(Hadoop Distributed File System)、YARN(Yet Another...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。它是由Apache软件基金会开发并维护的,旨在实现高效、可扩展的数据处理能力。Hadoop的核心由两个主要组件构成:Hadoop Distributed ...
在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path追加%HADOOP_HOME%\bin,有可能出现如下错误: org.apache.hadoop.io.nativeio.NativeIO$Windows....
Hadoop-2.7.0.tar是一个著名的开源分布式存储与计算系统Hadoop的安装包。Hadoop是由Apache软件基金会开发的一个分布式系统基础架构,主要解决大数据问题。Hadoop的设计初衷是可靠、高效、可伸缩地存储和处理大数据集...
Hadoop是Apache软件基金会开发的一个开源分布式计算框架,主要由HDFS(Hadoop Distributed File System)和MapReduce两大部分组成,旨在提供一种可靠、可扩展、高效的数据处理和存储解决方案。在标题中提到的...
在搭建Hadoop环境的过程中,经常会遇到一些特定的依赖问题,比如缺少`hadoop.dll`和`winutils.exe`这两个关键组件。本文将详细介绍这两个文件及其在Hadoop生态系统中的作用,以及如何解决它们缺失的问题。 首先,`...
在Hadoop生态系统中,`winutils.exe`和`hadoop.dll`是Windows环境下运行Hadoop必备的组件,尤其对于开发和测试环境来说至关重要。这里我们深入探讨这两个组件以及与Eclipse插件的相关性。 首先,`winutils.exe`是...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop2.6.0是这个框架的一个重要版本,它包含了多项优化和改进,以提高系统的稳定性和性能。在这个压缩包中,我们关注的是与Windows...