- 浏览: 61616 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
软件开发学习者lilonghui:
学习了,以前真没注意过这种细节
关于C/C++main函数 -
Branding:
受教!感谢!
html块级元素和内联元素
HDFS写入文件的重要概念
HDFS一个文件由多个block构成。HDFS在进行block读写的时候是以packet(默认每个packet为64K)为单位 进行的。每一个packet由若干个chunk(默认512Byte)组成。Chunk是进行数据校验的基本单位,对每一个chunk生成一个校验和(默 认4Byte)并将校验和进行存储。
在写入一个block的时候,数据传输的基本单位是packet,每个packet由若干个chunk组成。
HDFS客户端写文件示例代码
FileSystem hdfs = FileSystem.get(new Configuration());
Path path = new Path("/testfile");
// writing
FSDataOutputStream dos = hdfs.create(path);
byte[] readBuf = "Hello World".getBytes("UTF-8");
dos.write(readBuf, 0, readBuf.length);
dos.close();
hdfs.close();
文件的打开
上传一个文件到hdfs,一般会调用DistributedFileSystem.create,其实现如下:
public FSDataOutputStream create(Path f, FsPermission ermission,boolean overwrite,int bufferSize, short replication, long lockSize,Progressable progress) throws IOException
{
return new FSDataOutputStream (dfs.create(getPathName(f), permission,overwrite, replication, blockSize, progress, bufferSize), statistics);
}
其最终生成一个FSDataOutputStream用于向新生成的文件中写入数据。其成员变量dfs的类型为DFSClient,DFSClient的create函数如下:
public OutputStream create(String src,FsPermission permission,boolean overwrite,short replication,long blockSize,Progressable progress,int buffersize) throws IOException
{
checkOpen();
if (permission == null)
{
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
OutputStream result = new DFSOutputStream(src, masked,overwrite, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512));
leasechecker.put(src, result);
return result;
}
其中构造了一个DFSOutputStream,在其构造函数中,同过RPC调用NameNode的create来创建一个文件。
当然,构造函数中还做了一件重要的事情,就是streamer.start(),也即启动了一个pipeline,用于写数据,在写入数据的过程中,我们会仔细分析。
DFSOutputStream(String src, FsPermission masked, boolean overwrite,short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException
{
this(src, blockSize, progress, bytesPerChecksum);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try
{
namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} catch(RemoteException re)
{
throw re.unwrapRemoteException(AccessControlException.class,QuotaExceededException.class);
}
streamer.start();
}
通过rpc调用NameNode的create函数,调用namesystem.startFile函数,其又调用startFileInternal函数,它创建一个新的文件,状态为under construction,没有任何data block与之对应。
dfsclient文件的写入
下面轮到客户端向新创建的文件中写入数据了,一般会使用FSDataOutputStream的write方法:
按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1, 2, 3,则会进行如下的过程:
首先将package 1写入DataNode 1
然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕
FSDataOutputStream的write方法会调用DFSOutputStream的write方法,而DFSOutputStream继承自FSOutputSummer,所以实际上是调用FSOutputSummer的write方法,如下:
public synchronized void write(byte b[], int off, int len)
throws IOException
{
//参数检查
for (int n=0;n<len;n+=write1(b, off+n, len-n)) { }
}
FSOutputSummer的write1的方法如下:
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length)
{
// buf初始化的大小是chunk的大小,默认是512,这里的代码会在写入的数据的剩余内容大于或等于一个chunk的大小时调用
// 这里避免多余一次复制
final int length = buf.length;
sum.update(b, off, length);//length是一个完整chunk的大小,默认是512,这里根据一个chunk内容计算校验和
writeChecksumChunk(b, off, length, false);
return length;
}
// buf初始化的大小是chunk的大小,默认是512,这里的代码会在写入的数据的剩余内容小于一个chunk的大小时调用
// 规避了数组越界问题
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
sum.update(b, off, bytesToCopy);//bytesToCopy不足一个chunk,是写入的内容的最后一个chunk的剩余字节数目
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {//如果不足一个chunk,就缓存到本地buffer,如果还有下一次写入,就填充这个chunk,满一个chunk再flush,count清0
// local buffer is full
flushBuffer();//最终调用writeChecksumChunk方法实现
}
return bytesToCopy;
}
writeChecksumChunk的实现如下:
//写入一个chunk的数据长度(默认512),忽略len的长度
private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
throws IOException
{
int tempChecksum = (int)sum.getValue();
if (!keep)
{
sum.reset();
}
int2byte(tempChecksum, checksum);//把当前chunk的校验和从int转换为字节
writeChunk(b, off, len, checksum);
}
writeChunk由子类DFSOutputStream实现,如下:
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)throws IOException
{
//创建一个package,并写入数据
currentPacket = new Packet(packetSize, chunksPerPacket,bytesCurBlock);
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
//如果此package已满,则放入队列中准备发送
if (currentPacket.numChunks == currentPacket.maxChunks ||bytesCurBlock == blockSize) {
......
dataQueue.addLast(currentPacket);
//唤醒等待dataqueue的传输线程,也即DataStreamer
dataQueue.notifyAll();
currentPacket = null;
......
}
}
writeChunk比较简单,就是把数据填充packet,填充完毕,就放到dataQueue,再唤醒DataStreamer。
DataStreamer完成了数据的传输,DataStreamer的run函数如下:
public void run()
{
while (!closed && clientRunning) {
Packet one = null;
synchronized (dataQueue) {
boolean doSleep = processDatanodeError(hasError, false);
//如果ack出错,则处理IO错误
//如果队列中没有package,则等待
while ((!closed && !hasError && clientRunning && dataQueue.size() == 0) || doSleep) {
try {
dataQueue.wait(1000);
} catch (InterruptedException e) {
}
doSleep = false;
}
try {
//得到队列中的第一个package
one = dataQueue.getFirst();
long offsetInBlock = one.offsetInBlock;
//由NameNode分配block,并生成一个写入流指向此block
if (blockStream == null) {
nodes = nextBlockOutputStream(src);
response = new ResponseProcessor(nodes);
response.start();
}
ByteBuffer buf = one.getBuffer();
//将packet从dataQueue移至ackQueue,等待确认
dataQueue.removeFirst();
dataQueue.notifyAll();
synchronized (ackQueue) {
ackQueue.addLast(one);
ackQueue.notifyAll();
}
//利用生成的写入流将数据写入DataNode中的block
blockStream.write(buf.array(), buf.position(), buf.remaining());
if (one.lastPacketInBlock) {
blockStream.writeInt(0); //表示此block写入完毕
}
blockStream.flush();
} catch (Throwable e) {
}
if (one.lastPacketInBlock) {
//数据块写满,做一些清理工作,下次再申请块
response.close(); // ignore all errors in Response
synchronized (dataQueue) {
IOUtils.cleanup(LOG, blockStream, blockReplyStream);
nodes = null;
response = null;
blockStream = null;
//设置为null,下次就会判断blockStream为null,申请新的块
blockReplyStream = null;
}
}
}
......
}
DataStreamer线程负责把准备好的数据packet,顺序写入到DataNode,未确认写入成功的packet则移动到ackQueue,等待确认。
DataStreamer线程传输数据到DataNode时,要向namenode申请数据块,方法是nextBlockOutputStream,再调用locateFollowingBlock,通过RPC调用namenode.addBlock(src, clientName),在NameNode分配了DataNode和block以后,createBlockOutputStream开始写入数据。
客户端在DataStreamer的run函数中创建了写入流后,调用blockStream.write将packet写入DataNode
DataStreamer还会启动ResponseProcessor线程,它负责接收datanode的ack,当接收到所有 datanode对一个packet确认成功的ack,ResponseProcessor从ackQueue中删除相应的packet。在出错时,从 ackQueue中移除packet到dataQueue,移除失败的datanode,恢复数据块,建立新的pipeline。实现如下:
public void run() {
...
PipelineAck ack = new PipelineAck();
while (!closed && clientRunning && !lastPacketInBlock) {
try {
// read an ack from the pipeline
ack.readFields(blockReplyStream);
...
//处理所有DataNode响应的状态
for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
short reply = ack.getReply(i);
if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {//ack验证,如果DataNode写入packet失败,则出错
errorIndex = i; //记录损坏的DataNode,会在processDatanodeError方法移除该失败的DataNode
throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i].getName());
}
}
long seqno = ack.getSeqno();
if (seqno == Packet.HEART_BEAT_SEQNO) { // 心跳ack,忽略
continue;
}
Packet one = null;
synchronized (ackQueue) {
one = ackQueue.getFirst();
}
...
synchronized (ackQueue) {
assert ack.getSeqno() == lastAckedSeqno + 1;//验证ack
lastAckedSeqno = ack.getSeqno();
ackQueue.removeFirst();//移除确认写入成功的packet
ackQueue.notifyAll();
}
} catch (Exception e) {
if (!closed) {
hasError = true;//设置ack错误,让
...
closed = true;
}
}
}
}
当ResponseProcessor在确认packet失败时,processDatanodeError方法用于处理datanode的错误,当调用返回后需要休眠一段时间时,返回true。下面是其简单的处理流程:
1.关闭blockStream和blockReplyStream
2.将packet从ackQueue移到dataQueue
3.删除坏datanode
4.通过RPC调用datanode的recoverBlock方法来恢复块,如果有错,返回true
5.如果没有可用的datanode,关闭DFSOutputStream和streamer,返回false
6.创建块输出流,如果不成功,转到3
实现如下:
private boolean processDatanodeError(boolean hasError, boolean isAppend) {
if (!hasError) {//DataNode没有发生错误,直接返回
return false;
}
//将未确认写入成功的packets从ack queue移动到data queue的前面
synchronized (ackQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}
boolean success = false;
while (!success && clientRunning) {
DatanodeInfo[] newnodes = null;
//根据errorIndex确定失败的DataNode,从所有的DataNode nodes移除失败的DataNode,复制到newnodes
// 通知primary datanode做数据块恢复,更新合适的时间戳
LocatedBlock newBlock = null;
ClientDatanodeProtocol primary = null;
DatanodeInfo primaryNode = null;
try {
// Pick the "least" datanode as the primary datanode to avoid deadlock.
primaryNode = Collections.min(Arrays.asList(newnodes));
primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken, socketTimeout);
newBlock = primary.recoverBlock(block, isAppend, newnodes);//恢复数据块
} catch (IOException e) {
//循环创建块输出流,如果不成功,移除失败的DataNode
return true; // 需要休眠
} finally {
RPC.stopProxy(primary);
}
recoveryErrorCount = 0; // 数据块恢复成功
block = newBlock.getBlock();
accessToken = newBlock.getBlockToken();
nodes = newBlock.getLocations();
this.hasError = false;
lastException = null;
errorIndex = 0;
success = createBlockOutputStream(nodes, clientName, true);
}
response = new ResponseProcessor(nodes);
response.start();//启动ResponseProcessor做ack确认处理
return false; // 不休眠,继续处理
}
相关推荐
在分布式文件系统中,HDFS(Hadoop Distributed File System)扮演着核心角色,而HDFS的源码分析则是深入了解HDFS架构和实现机理的关键。本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、...
Hadoop 源码分析 HDFS 数据流 Hadoop 的 HDFS(Hadoop Distributed File System)是 Hadoop 项目中最核心的组件之一,它提供了高可靠、高-performance 的分布式文件系统。HDFS 的核心组件包括 Namenode、Datanode、...
**HDFS源码分析** 1. **NameNode与DataNode**:HDFS的核心组件包括NameNode和DataNode。NameNode作为元数据管理节点,存储文件系统的命名空间信息和文件的块映射信息。DataNode则是数据存储节点,负责存储实际的...
通过阅读和理解Hadoop源码分析-HDFS部分.pdf文档,我们可以更深入地理解这些组件的工作原理,掌握HDFS在处理大数据时的内部机制。这对于我们优化HDFS的性能,解决实际问题,以及开发相关的分布式应用都具有重要的...
这个"基于HDFS+FTP的文件存储与迁移实验代码.zip"包含了一个名为"HDFS_FTP_ForMyProject-master"的项目源码,这为我们提供了一个实际操作的平台。这里我们将详细讲解HDFS、FTP以及它们在人工智能领域的应用。 **...
5.8.3 源码分析 第6章 AvatarNode使用 6.1 方案说明 6.1.1 网络拓扑 6.1.2 操作系统安装及配置 6.2 使用Avatar打补丁版本 6.2.1 Hadoop源码联机Build 6.2.2 Hadoop源码本地Build 6.2.3 NFS服务器构建 6.2.4 Avatar...
《Hadoop_2.X_HDFS源码剖析》是由徐鹏编著的一本深入解析Hadoop 2.x版本中HDFS(Hadoop ...通过阅读本书,读者不仅可以理解HDFS的工作原理,还能掌握如何通过源码分析来解决问题,从而更好地利用和优化Hadoop环境。
HDFS的主要功能组件包括HDFS的核心类和接口,体系结构涉及HDFS的整体设计,NameNode是HDFS的核心,负责管理文件系统的命名空间,维护文件系统的文件树及整个HDFS的元数据,而DataNode则负责存储实际的数据。在HDFS...
HDFS 客户端数据流程分析是指在 Hadoop 分布式文件系统中,客户端如何与 HDFS 进行交互,读取和写入数据的过程。本文将对 HDFS 客户端数据流程进行详细的分析和解释。 读取数据流程 1. 客户端发起读取数据请求,...
这个"Hadoop源码分析视频下载"提供了一种深入理解Hadoop内部工作原理的途径,这对于开发者、系统管理员以及对大数据技术感兴趣的人来说是非常有价值的。接下来,我们将详细探讨Hadoop的核心组件、其设计哲学、源码...
在HDFS中,源码分析主要涉及NameNode、DataNode和客户端三部分。NameNode是HDFS的主节点,负责管理文件系统的命名空间和访问控制信息,以及维护文件块到DataNode的映射。DataNode是HDFS的数据节点,存储实际的数据块...
5. 源码分析:通过阅读HDFS的源码,可以深入了解其内部工作原理,包括数据块迁移的实现细节,以及路由节点如何处理请求和决策。 通过这样的学习,我们可以更好地优化HDFS的使用,提高集群性能,预防和解决数据分布...
### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...
### Storm源码分析 #### 一、Storm简介与应用场景 Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时...
Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。...对于 log4j 数据,Flume 可以实时监控日志文件并将其无缝地导入 HDFS,为后续的大数据分析提供基础。
总的来说,Hadoop分布式文件系统HDFS以其高容错、高吞吐量和面向大数据处理的特点,成为大数据分析和处理的首选工具。其设计考虑了硬件故障、大规模数据处理、数据复制和分布的复杂性,确保了系统在大规模部署时的...
MapReduce是Hadoop生态系统中的核心组件,主要用于处理和存储大规模数据。...通过阅读《Job本地提交过程源码分析及图解》这样的文档,我们可以深入学习MapReduce的工作原理,提升我们的Hadoop编程技能。
本文将基于“Hadoop学习总结和源码分析”这一主题,结合提供的文档资源,深入探讨Hadoop的核心组件HDFS(Hadoop Distributed File System)和MapReduce。 首先,我们从“Hadoop学习总结之一:HDFS简介.doc”开始,...