- 浏览: 78310 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
yuananyun:
图片看不清楚,要是能下载就好了
spark源码分析--spark的任务调度(补充一张图) -
QIAOtinger:
spark源码分析--rdd和stage的生成(更新了一张图) -
gaoshui87:
很好,学习了
开源力量spark公开课的ppt -
wangneng100:
请问PPT上传了吗,发到我邮箱一下,64947706@qq.c ...
开源力量spark公开课的ppt -
tanzek:
想请问楼主怎么调试源码呢?用idea的本地运行功能吗?
spark源码分析--rdd和stage的生成(更新了一张图)
hadoop 写操作的完整笔记
原创 转载请注明出处 http://baishuo491.iteye.com/blog/1958649 作者邮箱:vc_java@hotmail.com
Create操作
1.String clientMachine = getClientMachine();
2.namesystem.startFile(src,new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
null, masked),clientName, clientMachine, overwrite, createParent, replication, blockSize);
开始startFileInternal函数,写文件的重要过程
判断目标是否以路径的形式存在
clientNode:override-toString-DNDES:ubuntu|/default-rack|/default-rack|blockList实际上是个blockinfo:|null 参考5.6.2.1.2.3
全局的genstamp:1003
创建一个INodeFileUnderConstruction
开始调用leaseManager.addLease
结束调用leaseManager.addLease
put操作,最终会调用IOutils.copyBytes(in,out,buffersize) in.readbuf里面调用了read1,
如果读到的长度大于0,就把读到的数据,写入到一个输出流里面,接着继续in.readbuf,继续写入到输出流,如此循环。正常情况下,一次readBuf可以读4096字节,然后把这些字节传给out.write(buf, 0, bytesRead)后,则要循环8次,每次读入512字节数据,并生成4字节checksum
in.readbuf 调用 FSInputChecker(继承了FSInputStream)的read(byte[] b, int off, int len)函数
里面循环调用read1,如果read1返回 -1,表示读到了结尾
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;//注意packet的切换
}
在read1里面,如果len>=buf.length,就调用readChecksumChunk(b, off, len); 返回读入的字节数nRead, 这样len每次就减去nread
在readChecksumChunk(byte b[], int off, int len) 中()
ChecksumFileSystem$ChecksumFSInputChecker(FSInputChecker).readChecksumChunk(byte[], int, int)
调用read = readChunk(chunkPos, b, off, len, checksum);//read 可以是4096 也可以是2 和 -1
ChecksumFileSystem$ChecksumFSInputChecker.readChunk里面
int nread = readFully(datas, buf, offset, len);
FSInputChecker. readFully里面
for (;;) {
int nread = stm.read(buf, offset + n, len - n);
......................
}
最终调用RawLocalFileSystem$TrackingFileInputStream.read(byte[], int, int)
写入到输出流的过程如下
会调用FSDataOutPutStream$Positionache$write
循环调用write1 (DFSClient$DFSOutputStream(FSOutputSummer).write1(byte[], int, int))
1.先计算crc值
2.调用writeChecksumChunk
2.1 生成一个Checksum数组 4byte
2.2 writeChunk 传入buffer offset length checksum (要注意写 -1 那次)writeChunk每次写入512个字节
2.2.1 一系列检查
2.2.2 如果currentPacket是null(currentPacket在2.4.1.2被设置成null) 创建一个新的Packet,传入packetSize chunksperPacket byteCurBlock (65557,12, 0)
设置了一堆值 -----返回
2.3 写入系列操作
2.3.1向currentpacket 写入数据,先writeCheckSum 再writeData,都是用arraycopy拷贝到buf中
修改numChuncks和bytesCurBlock
2.3.3执行完2.3.1的写入,要把修改两个参数
currentPacket.numChunks++;
bytesCurBlock += len;//每次写入正常就是512
2.4 相关的临界操作
2.4.1根据2.3.3
修改过得参数作判断if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize)
2.4.1.1 特别的,如果满足第二个条件,表明一个块已经写满了
要更新如下的参数
currentPacket.lastPacketInBlock = true;//标明这是最后一个Packet
bytesCurBlock = 0; lastFlushOffset = 0;
2.4.1.2 无论是满足第一个条件,还是第二个条件,都要调用
enqueueCurrentPacket();
dataQueue.addLast(currentPacket);
dataQueue.notifyAll();**** 在DataStreamer的run函数里,会调用one = dataQueue.getFirst();标记01
lastQueuedSeqno = currentPacket.seqno;
currentPacket = null;
处理是否写满一个块 返回2,结束一个writeCheck
返回write1,返回length//512 (用作write1,修改传入参数用)
如果write结束,Position + = len incCount 更新 written
3转到DataNode上
3.1DataXReciever的run函数,读DFSClient通过createBlockOutputStream传过来的流,读到OP,如果是writetoblock指令,就调用DataXceiver的writeblock方法(每次这个函数被调用完,就会六大关闭)
3.1.1这个函数里面,
3.1.1.1首先通过DataXceiver的成员变量Socket s,创建一个DataOutputStream replyout,它对应的是createBlockOutputStream里面的blockReplyStream DataXceiver writetoblock里replyOut.writeShort(mirrorInStatus)对应 Client端 pipelineStatus = blockReplyStream.readShort();参考5.6.4.4
3.1.1.2 创建了一个blockReciever,在创建的过程中,Streams = datanode.data.writetoblock(......),这个stream里包含out和checksumout
writetoblock(FSDataSet)的过程:
3.1.1.2.1先设blocksize(调用 b.getNumBytes ) 然后检查ongoingcreates,如果有进程在创建这个block,就停掉它。
3.1.1.2.2 然后得到FSVolumn, 调用createTmpFile创建临时文件(在hdfs/data/blocksbeingWritt........下面)
然后volumeMap.put(b,new DataNodeBlockInfo(v,f))
ongoingCreates.put(b,new ActiveFile(f,threads))
3.1.1.2.3 调用File metafile = getMetaFile(f, b);//里面调用getGS,加上了GS 创建.meta文件
3.1.1.2.4 调用 createBlockWriteStreams( f , metafile)(FSDataSet); 这里面用FileOutPutStream,封装了RandomAccessFile
return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
null /*new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() )*/);
返回
从writetoblock返回后,得到一个Streams,通过Stream,可以给out和checksumout赋值(在这里可以做一些对应lun的修改)返回到创建blockReciever的地方
3.1.1.3 结束创建blockReciever后,replyOut会写两个东西,client端会收到(参考3.1.1.1)
replyOut.writeShort(mirrorInStatus);//0 Text.writeString(replyOut, firstBadLink);//firstBadLink =“”
3.1.1.4 blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets.length);//会长时间阻塞
这是一个有可能会长时间阻塞的函数,如果里面的流没有读到数据的话,这里面除了replyout,都是null
数据来自客户端,参考5.9,实际上,5.9的数据,被datanode接收到,是从3.1 一路传到这里的。
3.1.1.4.1 调用BlockMetadataHeader.writeHeader(checksumOut, checksum);//已经被注销掉了
主要写了这样一些东西
3.1.1.4.1.1 out.writeShort(header.getVersion());
3.1.1.4.1.2 header.getChecksum().writeHeader(out);
3.1.1.4.1.2.1 out.writeByte( type );//1
3.1.1.4.1.2.2 out.writeInt( bytesPerChecksum );//512
3.1.1.4.2 创建一个PacketResponder线程,并启动
new PacketResponder(this, block, mirrIn, replyOut, numTargets,Thread.currentThread())
3.1.1.4.3 再继续,blockReciever开始不停地调用receivePacket
while (receivePacket() > 0) {}//会因为blockStream.writeInt(0) 而结束(对应3.1.1.4.4)
3.1.1.4.3.1 payloadLen = readNextPacket();//1036
3.1.1.4.3.1.1 计算 chunksize = 516 (512 + 4)
3.1.1.4.3.1.2 计算 chunksperPacket(65536 -21 -4 + 516 -1)/516 约等于127
3.1.1.4.3.1.3 然后分配buf,容量为 21+4 +127*516 = 65557
.................................................
3.1.1.4.3.1.5 然后,在readToBuf(-1) 这个地方,开始读数据
......................................................
3.1.1.4.3.1.5.2 int nRead = in.read(buf.array(), buf.limit(), toRead);//如果输入流没有东西会阻塞,3.1.1.4的阻塞,就是在这里引起的,解除这里的阻塞,来自客户端发送的数据,参考5.9
3.1.1.4.3.1.5.3 bufRead = buf.limit() + nRead;//0 + 1057
3.1.1.4.3.1.5.4 buf.limit(bufRead);//表示这个位置之前的东西是有效的
3.1.1.4.3.1.5.5 return nRead;
3.1.1.4.3.1.6
3.1.1.4.3.2 buf.mark();
//read the header
buf.getInt(); // packet length 前4个 bytes
offsetInBlock = buf.getLong(); //得到当前packet,在block里面的offset 8个字节
long seqno = buf.getLong(); // get seqno 8个字节
boolean lastPacketInBlock = (buf.get() != 0);//buf读了一个字节 表明读到了设置的标记0了(对应5.10)
int endOfHeader = buf.position();//21(4+8+8+1) 原来21是这么来的?
buf.reset();//buf.position回归到mark的位置,回归的原因是3.1.1.4.6
3.1.1.4. 3.3 setBlockPosition(offsetInBlock);//FIXME offsetInBlock 来自前面的buf.getLong()
3.1.1.4. 3.4 先把packet写入mirror(只有一个副本不走这里)
3.1.1.4. 3.5 跳过21个字节,利用buf.position(endOfHeader); //21
然后读入一个int,得到的是文件的长度len(满的应该是65024)
3.1.1.4.3.7 先修改offsetInblock offsetInBlock += len;//len = 65024
3.1.1.4.3.8 计算checkSumOff = 508 ((65024 + 512 -1)/512) * 4
3.1.1.4.3.9 计算dataOff
checksumOff = buf.position();//25
int dataOff = checksumOff + checksumLen;//508 + 25
3.1.1.4.3.10 备份一份buff 到变量pktBuf
3.1.1.4.3.11 把buff的position设置成limit,使前面的数据不会被写入(为了下次写入?) buf.position(buf.limit());
3.1.1.4.3.12 调用verifyChunks 需要传入的参数 pktBuf(3.1.1.4.7.4),dataOff(3.1.1.4.7.3),len(3.1.1.4.6),和
checksumoff(3.1.1.4.7.2)
verifyChunks里,以512字节为长度,循环计算crc32校验,比如,如果这个文件有1024个字节,那么,数据前面crc占据的长度,就是1024/512 *4 = 8字节
3.1.1.4.3.13 如果还没有finalized,就写入磁盘
//写数据到磁盘!!! 底层调用 临时文件里的block里面的内容就增加了
out.write(pktBuf, dataOff, len);pktBuf这个buffer里面,dataOff后面,len个字节长度里的内容,全都是数据,
写入out
3.1.1.4.3.14 如果checksumOut不是null checksumOut.write(pktBuf, checksumOff, checksumLen);
把pktBuf中,checksumOff开始,checksumLen长度的部分,写入checksumOut
通过3.1.1.4.3.13 和3.1.1.4.3.14,可以看到,数据和checksum都是连续存放的,数据紧接着校验来存放
3.1.1.4.3.15 调用flush,里面包括out的flush和checksum的flush
3.1.1.4.3.16 之后更新datanode上的block的长度 datanode.data.setVisibleLength(block, offsetInBlock);//FIXME
从ongoingCreate里,拿到activeFile,更新之
3.1.1.4.3.17 ((PacketResponder)responder.getRunnable()).enqueue(seqno, lastPacketInBlock);
把seqno和lastPacketInBlock 加入到ackQueue(这会使3.1.1.4.2 创建的PacketResponder线程的run方法解除阻塞,参考3.1.1.4.2 参考6.1 )
3.1.1.4.3.18 返回 payloadLen // 65536 应该是数据加 校验的长度
返回到3.1.1.4.3的while (receivePacket() > 0)循环
如果返回的payloadLen>0 ,继续while循环
3.1.1.4.4 mirrorOut.writeInt(0); //标记块的结尾 mirrorOut.flush();
3.1.1.4.5 关闭responder(3.1.1.4.2创建)
3.1.1.4.6 block.setNumBytes(offsetInBlock);设置block的真实大小
3.1.1.4.7 datanode.data.finalizeBlock(block);和6.4.3是相同的调用,详情在那里看
3.1.1.5 datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
3.1.1.5.1 receivedBlockList.add(block);
3.1.1.5.2 delHints.add(delHint);
3.1.1.6 datanode.blockScanner.addBlock(block);
从3.1.1 writeblock 返回到3.1
回到DataXReciever的run里继续循环
在recievepacket里面
读出前面那21个字节,得到offsetInBlock,
5 在DataStreamer线程里DFSClient$DFSOutputStream$DataStreamer.run()
5.6 在标记01的后面nodes = nextBlockOutputStream(src); src是在DFSOutputStream的构造函数中传入的
这个位置是nextBlockOutputStream的唯一调用,也就是每次分配块,都是在这里弄的
nextBlockOutputStream基本上所有的操作都在一个有限次的while循环里,(retry && --count >= 0);其中retry默认是false,
count默认是3,来自配置文件
5.6.1一系列的初始化活动
5.6.2lb = locateFollowingBlock(startTime, excluded.length > 0 ? excluded : null);//远程调用namenode.addBlock(src,
clientName)
5.6.2.1 if (serverSupportsHdfs630) {
//可以支持Hdfs630,包含excludedNodes
return namenode.addBlock(src, clientName, excludedNodes);
}
转到NameNode端
在LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludedNodes)
5.6.2.1.1 创建excludedNodeList
5.6.2.1.2 locatedBlock = namesystem.getAdditionalBlock(src, clientName, excludedNodeList);
5.6.2.1.2.1 checkFsObjectLimit(); 判断maxFsObjects <= dir.totalInodes() + getBlocksTotal()
5.6.2.1.2.2pendingFile = checkLease(src, clientName);创建一个UnderConstruction的pendingfile
5. 6.2.1.2.2.1 INodeFile file = dir.getFileINode(src);//在dir里找到src对应的节点,这个节点是在create的过
程中被放进去的
5. 6.2.1.2.2.2 checkLease(src, holder, file);//如果file是underconstruction的,不会去map里检查lease
其实上面这个函数,如果不是underconstruction的,会报异常。只有是underconstruction的,
才会正常的执行完,最终只是简单的进行一下类型转换,和下面5. 6.2.1.2.2.3是重复的
5. 6.2.1.2.2.3 return (INodeFileUnderConstruction)file;
5.6.2.1.2.3
fileLength = pendingFile.computeContentSummary().getLength();//计算所有blk.getNumBytes()得到已有的
block的长度之和
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();//这也是一种拿到node的方法
replication = (int)pendingFile.getReplication();
5.6.2.1.2.4 replicator.chooseTarget
.chooseTarget的过程,和clusterMap有关
后面的过程是 Allocate 一个block,并把它记录在INode上
5.6.2.1.2.5. INode[] pathINodes = dir.getExistingPathINodes(src); 得到文件路径中所有path的INode,其中最后一个是新添加的文件对应的INode,状态为under construction
5.6.2.1.2.6 checkLease(src, clientName, pathINodes[inodesLen-1]);//注意上面还有一次checklease
5.6.2.1.2.7 得到最后一个节点,INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)
pathINodes[inodesLen - 1];
5.6.2.1.2.8 检查是不是 checkFileProgress
5.6.2.1.2.9 为文件分配block newBlock = allocateBlock(src, pathINodes); (这个不能全部照搬)
5.6.2.1.2.9.1
5.6.2.1.2.9.2里面的GS,对应create时创建的GS b.setGenerationStamp(getGenerationStamp());
5.6.2.1.2.9.3调用FSDirectory.addBlock(String, INode[], Block)
waitForReady();
锁住 rootDir
INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
检查quota限制并更新了占用空间updateCount(inodes, inodes.length-1, 0,
fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
//建立block和file的关联? associate the new list of blocks with this file
namesystem.blocksMap.addINode(block, fileNode);//numbyte是0吗?
BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
fileNode.addBlock(blockInfo);//fileNode的blocks增加了一个元素,这时仍然是
上面这句就是5.6.2.1.2.10里面blocks里已经有这个block的原因
return block;
5.6.2.1.2.9.4
AllocStruct allocStruct = allocBlock(32); b.setLunind(allocStruct.getLunid());
Integer[] blocks = allocStruct.getBlocks();
。。。。。。。。
5.6.2.1.2.9.5 返回block到5.6.2.1.2.9
5.6.2.1.2.10 设置写到哪个datanode上, 这个时候pendingFile 的blocks变量里已经有上面那个block了
pendingFile.setTargets(targets);
5.6.2.1.2.11 for (DatanodeDescriptor dn : targets) {//不太理解用途
dn.incBlocksScheduled();
}
5.6.2.1.2.12 LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);//第三个参数,表示fileoffset
5.6.2.1.2.13 设置写到哪个datanode上, 这个时候blocks变量里已经有上面那个block了
5.6.2.1.2.14 安全检查
5.6.2.1.2.15 返回5.6.2.1.2.12创建的b
返回5.6.2.1.2 namesystem.getAdditionalBlock
返回5.6.2.1 这时候已经返回到客户端
返回5.6.2 locateFollowingBlock
5.6.3
block = lb.getBlock();
accessToken = lb.getBlockToken();
nodes = lb.getLocations();//192.168.0.122:50010
5.6.4 连接到DataNodeList的第一个DataNode上
success = createBlockOutputStream(nodes, clientName, false);//在这个函数里面写数据,发出写入指令
5.6.4.1 InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());//创建和第一个datanode的连接
nodes[0].getName()=192.168.0.122:50010
通过target,建立socket,再通过socket,建立输出流
5.6.4.2 发出写入指令
5.6.4.3 checksum.writeHeader( out );
out.flush();
5.6.4.4 pipelineStatus = blockReplyStream.readShort();//这是一个阻塞的函数,对应DataXceiver writetoblock里replyOut.writeShort(mirrorInStatus);参考 3.1.1.1
5.6.4.5 blockStream = out;//把out赋给blockStream,后面的全都是数据了
result = true; //返回是否写入成功 success
5.6.4.6 处理各种异常,返回result
5.6.5 如果5.6.4返回的success是false,说明创建失败
5.6.5.1 namenode.abandonBlock(block, src, clientName);
5.6.5.2 excludedNodes.add(nodes[errorIndex]);
retry = true;更改while循环的条件,是为了出来连接断掉的情况
5.6.7 跳出循环外,如果success仍然是false,抛出异常
5.6.8 返回nodes到5.6
5.7 以5.6得到的nodes为参数,启动一个ResponseProcessor(nodes)线程
5.8 从标记01处得到的one里面得到buf,如果one不是心跳packet,就从dataQueue中去掉,加入ackQueue
5.9 利用5.8得到的buf,想远端的datanode写数据blockStream.write(buf.array(), buf.position(), buf.remaining())
这个过程,可以激活阻塞的ReadNextPacket函数里的ReadToBuf,返回的nRead,正是这里传入的buf.remaining()
参考3.1.1.4.3.1.5.2
5.10如果one是块里最后一个packet(根据one.lastPacketInBlock来判断),则向远端发送一个结束标志blockStream.writeInt(0)(对应3.1.1.4.4?)
blockStream.flush();
lastPacket = System.currentTimeMillis();
5.11如果one是块里最后一个packet(重新判断了一遍,同5.10)
5.11.1 阻塞等待ackQueue的消息 ackQueue.wait();
5.11.2 等5.11.1通过以后,关闭5.7创建的线程
response.close(); // ignore all errors in Response
response.join() 等待,直到结束
5.11.3 IOUtils.cleanup(LOG, blockStream, blockReplyStream);
nodes = null;
response = null;
blockStream = null;//标明要新建一个block写入流
blockReplyStream = null;
5.11.4 if (progress != null) { progress.progress(); }
5的run方法结束
6 对于blockreciever 的内部类PacketResponder(DN ,blockreciever)来说(创建于3.1.1.4.2):
几个关键的成员变量
//packet waiting for ack
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private volatile boolean running = true;
private Block block;
DataInputStream mirrorIn; // input from downstream datanode
DataOutputStream replyOut; // output to upstream datanode
private int numTargets; //下游节点的数目,非分布式是0 number of downstream datanodes including myself
private BlockReceiver receiver; //这个responder的所有者 The owner of this responder.
private Thread receiverThread; // the thread that spawns this responder
在PacketResponder的run函数里
所有的操作都在一个while循环里,循环条件是 running && datanode.shouldRun && !lastPacketInBlock
在这个循环里
6.1 ackQueue在被放入东西后,可以通过wait阻塞(对应BlockReciever中,recievePacket函数里
((PacketResponder)responder.getRunnable()).enqueue(seqno, lastPacketInBlock); 参考3.1.1.4.3.17 )
6.2通过wait之后,得到一个pkt,因而得到seqno,赋给expected,然后notifyall()
6.3如果numTargets>0,还要很多操作,(暂时不用)
6.4 lastPacketInBlock = pkt.lastPacketInBlock;//判断是不是最后一个包
如果是最后一个包,并且receiver没有finalized 进行关闭文件操作 (lastPacketInBlock && !receiver.finalized)
6.4.1 receiver.close();关闭checksumout和out
6.4.2设置block的真正大小block.setNumBytes(receiver.offsetInBlock);// receiver的offsetInBlock在receivepacket里
设定
6.4.3 datanode.data.finalizeBlock(block);
6.4.3.1 finalizeblockInteranal
从ongoingCreates和VolumeMap里得到临时文件
File dest = null;
dest = v.addBlock(b, f);
FSDataSet$FSDir.addBlock(b,f) (其中,f是tmpfile,返回的时候,就是正式文件了)
首先尝试在不创建子目录的情况下addBlock,File file = addBlock(b, src, false, false);
如果创建失败,则调用创建子目录的addBlock addBlock(b, src, true, true);
在函数addBlock(Block b, File src, boolean createOk, boolean resetIdx)里
先判断if (numBlocks < maxBlocksPerDir) 如果成立,就不用创建新目录
File dest = new File(dir, b.getBlockName());//在正式文件目录,创建一个同名
block文件,得到一个BlockName(),其实是为了得到rename的名称
同理,在得到一个正式文件目录下的meta文件的同名文件名
利用rename,实现block文件和meta文件向正式文件目录的迁移
numBlocks += 1;
返回dest
如果需要创建subdir,就创建maxBlocksPerDir个subdir
new FSDir[maxBlocksPerDir];
然后随机选择一个subdir,继续执行addblock操作(类似递归调用)
返回
返回FSDataSet$FSVolumn.addBlock(b,f)中,
得到metaFile,然后重新计算磁盘使用的空间
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
返回dest = v.addBlock(b, f);
volumeMap.put(b, new DatanodeBlockInfo(v, dest));dest是返回的正式block
ongoingCreates.remove(b);移除ongoingCreates里面的对应信息
返回
返回finalizeblockInteranal
返回到PacketResponder的run函数里
datanode.myMetrics.incrBlocksWritten();
datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);//通知namenode收到了一个块
receivedBlockList.add(block);
delHints.add(delHint);
receivedBlockList.notifyAll();//receivedBlockList是需要关注的
把offset设置为0
开始创建应答
replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
PipelineAck replyAck = new PipelineAck(expected, replies);//expected 其实是一个seqno
然后通过replyOut向前一个节点发送
这个应答应该被DFSClient$DFSOutPutStream$ResponseProcessor 里面的run方法接受到
run循环继续运行,正常情况应该到块结束
内存中的文件节点,是在create的过程中被加进去的
注意INodeFile的移位操作
DatanodeDescriptor clientNode = 得到node的方式
host2DataNodeMap.getDatanodeByHost(clientMachine);
LunStatus luns = lunid2blockBitMap.get(block.getLunind());
luns.setTailposition(luns.getTailposition() + block.getNumBytes());//更新尾部的位置,为了下次allocateBlock作准备
if (block != storedBlock) {
if (block.getNumBytes() >= 0) {
long cursize = storedBlock.getNumBytes();
INodeFile file = storedBlock.getINode();
if (cursize == 0) {
storedBlock.setNumBytes(block.getNumBytes());
addStoredBlock(FsnameSystem)在系统的两个位置被调用blockReceived(DatanodeID, Block, String) 和processReport(DatanodeID, BlockListAsLongs)
前者是在offerService函数中被远程调用的namenode.blockReceived
后者同样是被offerService中间接调用 namenode.blockReport 返回DatanodeCommand
namenode.complete 上追溯可以到DFSOutPutStream.close, 这个函数在系统里有50个调用处
主要关注两个
datanode上blockreciever里的close
Dfsclient 里,leaseChecker的 close
8在addStoredBlock里
8.1 从fsnamesys里,拿到storedBlock BlockInfo storedBlock = blocksMap.getStoredBlock(block);
8.1.1正常情况下storedBlock是不应该为null的,非正常情况下的处理很复杂,需仔细研究
8.1.2在不为null的情况下
8.1.2.1 可以设置一些lunid或者lunoffset
8.1.2.2 拿到对应的fileINode INodeFile fileINode = storedBlock.getINode();//StoredBlock.inode.blocks[0].inode
8.1.2.3 向blocklist里面insert,实际上是插入到triple链表的头上 boolean added = node.addBlock(storedBlock); //回
到这个点,看看node->blocklist->triplets
8.1.2.4 判断这个将要报告的block,是不是这个underconstruction file的当前的最后一个block
最后得到一个boolean值:blockUnderConstruction = last.equals(storedBlock);
8.1.2.5 如果block != storedBlock,要作一些处理 需仔细研究
8.1.2.6 如果8.1.2.3 的added 是真,判断一下是否是在安全模式,如果不是安全模式,记录一条log(原因在程序的注释里)
8.1.2.7 计算一下numCurrentReplica ,然后更新一下和safemode相关的安全块数(需要继续研究)
8.1.2.8 如果8.1.2.4的blockUnderConstruction为真,
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
cons.addTarget(node);
return block;
8.1.2.9 如果8.1.2.8的条件没有满足,还有很多操作,需要继续研究
DFSClient$DFSOutputStream.closeInternal() line: 4035
DFSClient$DFSOutputStream.close() line: 3952
FSDataOutputStream$PositionCache.close() line: 61
FSDataOutputStream.close() line: 86
IOUtils.copyBytes(InputStream, OutputStream, int, boolean) line: 50
IOUtils.copyBytes(InputStream, OutputStream, Configuration, boolean) line: 100
FileUtil.copy(FileSystem, Path, FileSystem, Path, boolean, boolean, Configuration) line: 230
FileUtil.copy(FileSystem, Path[], FileSystem, Path, boolean, boolean, Configuration) line: 176
DistributedFileSystem(FileSystem).copyFromLocalFile(boolean, boolean, Path[], Path) line: 1183
FsShell.copyFromLocal(Path[], String) line: 139
FsShell.run(String[]) line: 2157
ToolRunner.run(Configuration, Tool, String[]) line: 65
ToolRunner.run(Tool, String[]) line: 79
FsShell.main(String[]) line: 2311
{
copyBytes(in, out, buffSize);
} finally {
if(close) {
out.close(); //这句里面,调用了namenode.complete
in.close();
}
RawLocalFileSystem.getFileStatus(Path) line: 393
LocalFileSystem(FilterFileSystem).getFileStatus(Path) line: 251
public FileStatus getFileStatus(Path f) throws IOException {
File path = pathToFile(f);
if (path.exists()) {
return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
} else {
throw new FileNotFoundException( "File " + f + " does not exist.");
}
}
Create操作
1.String clientMachine = getClientMachine();
2.namesystem.startFile(src,new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
null, masked),clientName, clientMachine, overwrite, createParent, replication, blockSize);
开始startFileInternal函数,写文件的重要过程
判断目标是否以路径的形式存在
clientNode:override-toString-DNDES:ubuntu|/default-rack|/default-rack|blockList实际上是个blockinfo:|null 参考5.6.2.1.2.3
全局的genstamp:1003
创建一个INodeFileUnderConstruction
开始调用leaseManager.addLease
结束调用leaseManager.addLease
put操作,最终会调用IOutils.copyBytes(in,out,buffersize) in.readbuf里面调用了read1,
如果读到的长度大于0,就把读到的数据,写入到一个输出流里面,接着继续in.readbuf,继续写入到输出流,如此循环。正常情况下,一次readBuf可以读4096字节,然后把这些字节传给out.write(buf, 0, bytesRead)后,则要循环8次,每次读入512字节数据,并生成4字节checksum
in.readbuf 调用 FSInputChecker(继承了FSInputStream)的read(byte[] b, int off, int len)函数
里面循环调用read1,如果read1返回 -1,表示读到了结尾
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;//注意packet的切换
}
在read1里面,如果len>=buf.length,就调用readChecksumChunk(b, off, len); 返回读入的字节数nRead, 这样len每次就减去nread
在readChecksumChunk(byte b[], int off, int len) 中()
ChecksumFileSystem$ChecksumFSInputChecker(FSInputChecker).readChecksumChunk(byte[], int, int)
调用read = readChunk(chunkPos, b, off, len, checksum);//read 可以是4096 也可以是2 和 -1
ChecksumFileSystem$ChecksumFSInputChecker.readChunk里面
int nread = readFully(datas, buf, offset, len);
FSInputChecker. readFully里面
for (;;) {
int nread = stm.read(buf, offset + n, len - n);
......................
}
最终调用RawLocalFileSystem$TrackingFileInputStream.read(byte[], int, int)
写入到输出流的过程如下
会调用FSDataOutPutStream$Positionache$write
循环调用write1 (DFSClient$DFSOutputStream(FSOutputSummer).write1(byte[], int, int))
1.先计算crc值
2.调用writeChecksumChunk
2.1 生成一个Checksum数组 4byte
2.2 writeChunk 传入buffer offset length checksum (要注意写 -1 那次)writeChunk每次写入512个字节
2.2.1 一系列检查
2.2.2 如果currentPacket是null(currentPacket在2.4.1.2被设置成null) 创建一个新的Packet,传入packetSize chunksperPacket byteCurBlock (65557,12, 0)
设置了一堆值 -----返回
2.3 写入系列操作
2.3.1向currentpacket 写入数据,先writeCheckSum 再writeData,都是用arraycopy拷贝到buf中
修改numChuncks和bytesCurBlock
2.3.3执行完2.3.1的写入,要把修改两个参数
currentPacket.numChunks++;
bytesCurBlock += len;//每次写入正常就是512
2.4 相关的临界操作
2.4.1根据2.3.3
修改过得参数作判断if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize)
2.4.1.1 特别的,如果满足第二个条件,表明一个块已经写满了
要更新如下的参数
currentPacket.lastPacketInBlock = true;//标明这是最后一个Packet
bytesCurBlock = 0; lastFlushOffset = 0;
2.4.1.2 无论是满足第一个条件,还是第二个条件,都要调用
enqueueCurrentPacket();
dataQueue.addLast(currentPacket);
dataQueue.notifyAll();**** 在DataStreamer的run函数里,会调用one = dataQueue.getFirst();标记01
lastQueuedSeqno = currentPacket.seqno;
currentPacket = null;
处理是否写满一个块 返回2,结束一个writeCheck
返回write1,返回length//512 (用作write1,修改传入参数用)
如果write结束,Position + = len incCount 更新 written
3转到DataNode上
3.1DataXReciever的run函数,读DFSClient通过createBlockOutputStream传过来的流,读到OP,如果是writetoblock指令,就调用DataXceiver的writeblock方法(每次这个函数被调用完,就会六大关闭)
3.1.1这个函数里面,
3.1.1.1首先通过DataXceiver的成员变量Socket s,创建一个DataOutputStream replyout,它对应的是createBlockOutputStream里面的blockReplyStream DataXceiver writetoblock里replyOut.writeShort(mirrorInStatus)对应 Client端 pipelineStatus = blockReplyStream.readShort();参考5.6.4.4
3.1.1.2 创建了一个blockReciever,在创建的过程中,Streams = datanode.data.writetoblock(......),这个stream里包含out和checksumout
writetoblock(FSDataSet)的过程:
3.1.1.2.1先设blocksize(调用 b.getNumBytes ) 然后检查ongoingcreates,如果有进程在创建这个block,就停掉它。
3.1.1.2.2 然后得到FSVolumn, 调用createTmpFile创建临时文件(在hdfs/data/blocksbeingWritt........下面)
然后volumeMap.put(b,new DataNodeBlockInfo(v,f))
ongoingCreates.put(b,new ActiveFile(f,threads))
3.1.1.2.3 调用File metafile = getMetaFile(f, b);//里面调用getGS,加上了GS 创建.meta文件
3.1.1.2.4 调用 createBlockWriteStreams( f , metafile)(FSDataSet); 这里面用FileOutPutStream,封装了RandomAccessFile
return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
null /*new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() )*/);
返回
从writetoblock返回后,得到一个Streams,通过Stream,可以给out和checksumout赋值(在这里可以做一些对应lun的修改)返回到创建blockReciever的地方
3.1.1.3 结束创建blockReciever后,replyOut会写两个东西,client端会收到(参考3.1.1.1)
replyOut.writeShort(mirrorInStatus);//0 Text.writeString(replyOut, firstBadLink);//firstBadLink =“”
3.1.1.4 blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets.length);//会长时间阻塞
这是一个有可能会长时间阻塞的函数,如果里面的流没有读到数据的话,这里面除了replyout,都是null
数据来自客户端,参考5.9,实际上,5.9的数据,被datanode接收到,是从3.1 一路传到这里的。
3.1.1.4.1 调用BlockMetadataHeader.writeHeader(checksumOut, checksum);//已经被注销掉了
主要写了这样一些东西
3.1.1.4.1.1 out.writeShort(header.getVersion());
3.1.1.4.1.2 header.getChecksum().writeHeader(out);
3.1.1.4.1.2.1 out.writeByte( type );//1
3.1.1.4.1.2.2 out.writeInt( bytesPerChecksum );//512
3.1.1.4.2 创建一个PacketResponder线程,并启动
new PacketResponder(this, block, mirrIn, replyOut, numTargets,Thread.currentThread())
3.1.1.4.3 再继续,blockReciever开始不停地调用receivePacket
while (receivePacket() > 0) {}//会因为blockStream.writeInt(0) 而结束(对应3.1.1.4.4)
3.1.1.4.3.1 payloadLen = readNextPacket();//1036
3.1.1.4.3.1.1 计算 chunksize = 516 (512 + 4)
3.1.1.4.3.1.2 计算 chunksperPacket(65536 -21 -4 + 516 -1)/516 约等于127
3.1.1.4.3.1.3 然后分配buf,容量为 21+4 +127*516 = 65557
.................................................
3.1.1.4.3.1.5 然后,在readToBuf(-1) 这个地方,开始读数据
......................................................
3.1.1.4.3.1.5.2 int nRead = in.read(buf.array(), buf.limit(), toRead);//如果输入流没有东西会阻塞,3.1.1.4的阻塞,就是在这里引起的,解除这里的阻塞,来自客户端发送的数据,参考5.9
3.1.1.4.3.1.5.3 bufRead = buf.limit() + nRead;//0 + 1057
3.1.1.4.3.1.5.4 buf.limit(bufRead);//表示这个位置之前的东西是有效的
3.1.1.4.3.1.5.5 return nRead;
3.1.1.4.3.1.6
3.1.1.4.3.2 buf.mark();
//read the header
buf.getInt(); // packet length 前4个 bytes
offsetInBlock = buf.getLong(); //得到当前packet,在block里面的offset 8个字节
long seqno = buf.getLong(); // get seqno 8个字节
boolean lastPacketInBlock = (buf.get() != 0);//buf读了一个字节 表明读到了设置的标记0了(对应5.10)
int endOfHeader = buf.position();//21(4+8+8+1) 原来21是这么来的?
buf.reset();//buf.position回归到mark的位置,回归的原因是3.1.1.4.6
3.1.1.4. 3.3 setBlockPosition(offsetInBlock);//FIXME offsetInBlock 来自前面的buf.getLong()
3.1.1.4. 3.4 先把packet写入mirror(只有一个副本不走这里)
3.1.1.4. 3.5 跳过21个字节,利用buf.position(endOfHeader); //21
然后读入一个int,得到的是文件的长度len(满的应该是65024)
3.1.1.4.3.7 先修改offsetInblock offsetInBlock += len;//len = 65024
3.1.1.4.3.8 计算checkSumOff = 508 ((65024 + 512 -1)/512) * 4
3.1.1.4.3.9 计算dataOff
checksumOff = buf.position();//25
int dataOff = checksumOff + checksumLen;//508 + 25
3.1.1.4.3.10 备份一份buff 到变量pktBuf
3.1.1.4.3.11 把buff的position设置成limit,使前面的数据不会被写入(为了下次写入?) buf.position(buf.limit());
3.1.1.4.3.12 调用verifyChunks 需要传入的参数 pktBuf(3.1.1.4.7.4),dataOff(3.1.1.4.7.3),len(3.1.1.4.6),和
checksumoff(3.1.1.4.7.2)
verifyChunks里,以512字节为长度,循环计算crc32校验,比如,如果这个文件有1024个字节,那么,数据前面crc占据的长度,就是1024/512 *4 = 8字节
3.1.1.4.3.13 如果还没有finalized,就写入磁盘
//写数据到磁盘!!! 底层调用 临时文件里的block里面的内容就增加了
out.write(pktBuf, dataOff, len);pktBuf这个buffer里面,dataOff后面,len个字节长度里的内容,全都是数据,
写入out
3.1.1.4.3.14 如果checksumOut不是null checksumOut.write(pktBuf, checksumOff, checksumLen);
把pktBuf中,checksumOff开始,checksumLen长度的部分,写入checksumOut
通过3.1.1.4.3.13 和3.1.1.4.3.14,可以看到,数据和checksum都是连续存放的,数据紧接着校验来存放
3.1.1.4.3.15 调用flush,里面包括out的flush和checksum的flush
3.1.1.4.3.16 之后更新datanode上的block的长度 datanode.data.setVisibleLength(block, offsetInBlock);//FIXME
从ongoingCreate里,拿到activeFile,更新之
3.1.1.4.3.17 ((PacketResponder)responder.getRunnable()).enqueue(seqno, lastPacketInBlock);
把seqno和lastPacketInBlock 加入到ackQueue(这会使3.1.1.4.2 创建的PacketResponder线程的run方法解除阻塞,参考3.1.1.4.2 参考6.1 )
3.1.1.4.3.18 返回 payloadLen // 65536 应该是数据加 校验的长度
返回到3.1.1.4.3的while (receivePacket() > 0)循环
如果返回的payloadLen>0 ,继续while循环
3.1.1.4.4 mirrorOut.writeInt(0); //标记块的结尾 mirrorOut.flush();
3.1.1.4.5 关闭responder(3.1.1.4.2创建)
3.1.1.4.6 block.setNumBytes(offsetInBlock);设置block的真实大小
3.1.1.4.7 datanode.data.finalizeBlock(block);和6.4.3是相同的调用,详情在那里看
3.1.1.5 datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
3.1.1.5.1 receivedBlockList.add(block);
3.1.1.5.2 delHints.add(delHint);
3.1.1.6 datanode.blockScanner.addBlock(block);
从3.1.1 writeblock 返回到3.1
回到DataXReciever的run里继续循环
在recievepacket里面
读出前面那21个字节,得到offsetInBlock,
5 在DataStreamer线程里DFSClient$DFSOutputStream$DataStreamer.run()
5.6 在标记01的后面nodes = nextBlockOutputStream(src); src是在DFSOutputStream的构造函数中传入的
这个位置是nextBlockOutputStream的唯一调用,也就是每次分配块,都是在这里弄的
nextBlockOutputStream基本上所有的操作都在一个有限次的while循环里,(retry && --count >= 0);其中retry默认是false,
count默认是3,来自配置文件
5.6.1一系列的初始化活动
5.6.2lb = locateFollowingBlock(startTime, excluded.length > 0 ? excluded : null);//远程调用namenode.addBlock(src,
clientName)
5.6.2.1 if (serverSupportsHdfs630) {
//可以支持Hdfs630,包含excludedNodes
return namenode.addBlock(src, clientName, excludedNodes);
}
转到NameNode端
在LocatedBlock addBlock(String src, String clientName, DatanodeInfo[] excludedNodes)
5.6.2.1.1 创建excludedNodeList
5.6.2.1.2 locatedBlock = namesystem.getAdditionalBlock(src, clientName, excludedNodeList);
5.6.2.1.2.1 checkFsObjectLimit(); 判断maxFsObjects <= dir.totalInodes() + getBlocksTotal()
5.6.2.1.2.2pendingFile = checkLease(src, clientName);创建一个UnderConstruction的pendingfile
5. 6.2.1.2.2.1 INodeFile file = dir.getFileINode(src);//在dir里找到src对应的节点,这个节点是在create的过
程中被放进去的
5. 6.2.1.2.2.2 checkLease(src, holder, file);//如果file是underconstruction的,不会去map里检查lease
其实上面这个函数,如果不是underconstruction的,会报异常。只有是underconstruction的,
才会正常的执行完,最终只是简单的进行一下类型转换,和下面5. 6.2.1.2.2.3是重复的
5. 6.2.1.2.2.3 return (INodeFileUnderConstruction)file;
5.6.2.1.2.3
fileLength = pendingFile.computeContentSummary().getLength();//计算所有blk.getNumBytes()得到已有的
block的长度之和
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();//这也是一种拿到node的方法
replication = (int)pendingFile.getReplication();
5.6.2.1.2.4 replicator.chooseTarget
.chooseTarget的过程,和clusterMap有关
后面的过程是 Allocate 一个block,并把它记录在INode上
5.6.2.1.2.5. INode[] pathINodes = dir.getExistingPathINodes(src); 得到文件路径中所有path的INode,其中最后一个是新添加的文件对应的INode,状态为under construction
5.6.2.1.2.6 checkLease(src, clientName, pathINodes[inodesLen-1]);//注意上面还有一次checklease
5.6.2.1.2.7 得到最后一个节点,INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)
pathINodes[inodesLen - 1];
5.6.2.1.2.8 检查是不是 checkFileProgress
5.6.2.1.2.9 为文件分配block newBlock = allocateBlock(src, pathINodes); (这个不能全部照搬)
5.6.2.1.2.9.1
5.6.2.1.2.9.2里面的GS,对应create时创建的GS b.setGenerationStamp(getGenerationStamp());
5.6.2.1.2.9.3调用FSDirectory.addBlock(String, INode[], Block)
waitForReady();
锁住 rootDir
INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
检查quota限制并更新了占用空间updateCount(inodes, inodes.length-1, 0,
fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
//建立block和file的关联? associate the new list of blocks with this file
namesystem.blocksMap.addINode(block, fileNode);//numbyte是0吗?
BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
fileNode.addBlock(blockInfo);//fileNode的blocks增加了一个元素,这时仍然是
上面这句就是5.6.2.1.2.10里面blocks里已经有这个block的原因
return block;
5.6.2.1.2.9.4
AllocStruct allocStruct = allocBlock(32); b.setLunind(allocStruct.getLunid());
Integer[] blocks = allocStruct.getBlocks();
。。。。。。。。
5.6.2.1.2.9.5 返回block到5.6.2.1.2.9
5.6.2.1.2.10 设置写到哪个datanode上, 这个时候pendingFile 的blocks变量里已经有上面那个block了
pendingFile.setTargets(targets);
5.6.2.1.2.11 for (DatanodeDescriptor dn : targets) {//不太理解用途
dn.incBlocksScheduled();
}
5.6.2.1.2.12 LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);//第三个参数,表示fileoffset
5.6.2.1.2.13 设置写到哪个datanode上, 这个时候blocks变量里已经有上面那个block了
5.6.2.1.2.14 安全检查
5.6.2.1.2.15 返回5.6.2.1.2.12创建的b
返回5.6.2.1.2 namesystem.getAdditionalBlock
返回5.6.2.1 这时候已经返回到客户端
返回5.6.2 locateFollowingBlock
5.6.3
block = lb.getBlock();
accessToken = lb.getBlockToken();
nodes = lb.getLocations();//192.168.0.122:50010
5.6.4 连接到DataNodeList的第一个DataNode上
success = createBlockOutputStream(nodes, clientName, false);//在这个函数里面写数据,发出写入指令
5.6.4.1 InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());//创建和第一个datanode的连接
nodes[0].getName()=192.168.0.122:50010
通过target,建立socket,再通过socket,建立输出流
5.6.4.2 发出写入指令
5.6.4.3 checksum.writeHeader( out );
out.flush();
5.6.4.4 pipelineStatus = blockReplyStream.readShort();//这是一个阻塞的函数,对应DataXceiver writetoblock里replyOut.writeShort(mirrorInStatus);参考 3.1.1.1
5.6.4.5 blockStream = out;//把out赋给blockStream,后面的全都是数据了
result = true; //返回是否写入成功 success
5.6.4.6 处理各种异常,返回result
5.6.5 如果5.6.4返回的success是false,说明创建失败
5.6.5.1 namenode.abandonBlock(block, src, clientName);
5.6.5.2 excludedNodes.add(nodes[errorIndex]);
retry = true;更改while循环的条件,是为了出来连接断掉的情况
5.6.7 跳出循环外,如果success仍然是false,抛出异常
5.6.8 返回nodes到5.6
5.7 以5.6得到的nodes为参数,启动一个ResponseProcessor(nodes)线程
5.8 从标记01处得到的one里面得到buf,如果one不是心跳packet,就从dataQueue中去掉,加入ackQueue
5.9 利用5.8得到的buf,想远端的datanode写数据blockStream.write(buf.array(), buf.position(), buf.remaining())
这个过程,可以激活阻塞的ReadNextPacket函数里的ReadToBuf,返回的nRead,正是这里传入的buf.remaining()
参考3.1.1.4.3.1.5.2
5.10如果one是块里最后一个packet(根据one.lastPacketInBlock来判断),则向远端发送一个结束标志blockStream.writeInt(0)(对应3.1.1.4.4?)
blockStream.flush();
lastPacket = System.currentTimeMillis();
5.11如果one是块里最后一个packet(重新判断了一遍,同5.10)
5.11.1 阻塞等待ackQueue的消息 ackQueue.wait();
5.11.2 等5.11.1通过以后,关闭5.7创建的线程
response.close(); // ignore all errors in Response
response.join() 等待,直到结束
5.11.3 IOUtils.cleanup(LOG, blockStream, blockReplyStream);
nodes = null;
response = null;
blockStream = null;//标明要新建一个block写入流
blockReplyStream = null;
5.11.4 if (progress != null) { progress.progress(); }
5的run方法结束
6 对于blockreciever 的内部类PacketResponder(DN ,blockreciever)来说(创建于3.1.1.4.2):
几个关键的成员变量
//packet waiting for ack
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private volatile boolean running = true;
private Block block;
DataInputStream mirrorIn; // input from downstream datanode
DataOutputStream replyOut; // output to upstream datanode
private int numTargets; //下游节点的数目,非分布式是0 number of downstream datanodes including myself
private BlockReceiver receiver; //这个responder的所有者 The owner of this responder.
private Thread receiverThread; // the thread that spawns this responder
在PacketResponder的run函数里
所有的操作都在一个while循环里,循环条件是 running && datanode.shouldRun && !lastPacketInBlock
在这个循环里
6.1 ackQueue在被放入东西后,可以通过wait阻塞(对应BlockReciever中,recievePacket函数里
((PacketResponder)responder.getRunnable()).enqueue(seqno, lastPacketInBlock); 参考3.1.1.4.3.17 )
6.2通过wait之后,得到一个pkt,因而得到seqno,赋给expected,然后notifyall()
6.3如果numTargets>0,还要很多操作,(暂时不用)
6.4 lastPacketInBlock = pkt.lastPacketInBlock;//判断是不是最后一个包
如果是最后一个包,并且receiver没有finalized 进行关闭文件操作 (lastPacketInBlock && !receiver.finalized)
6.4.1 receiver.close();关闭checksumout和out
6.4.2设置block的真正大小block.setNumBytes(receiver.offsetInBlock);// receiver的offsetInBlock在receivepacket里
设定
6.4.3 datanode.data.finalizeBlock(block);
6.4.3.1 finalizeblockInteranal
从ongoingCreates和VolumeMap里得到临时文件
File dest = null;
dest = v.addBlock(b, f);
FSDataSet$FSDir.addBlock(b,f) (其中,f是tmpfile,返回的时候,就是正式文件了)
首先尝试在不创建子目录的情况下addBlock,File file = addBlock(b, src, false, false);
如果创建失败,则调用创建子目录的addBlock addBlock(b, src, true, true);
在函数addBlock(Block b, File src, boolean createOk, boolean resetIdx)里
先判断if (numBlocks < maxBlocksPerDir) 如果成立,就不用创建新目录
File dest = new File(dir, b.getBlockName());//在正式文件目录,创建一个同名
block文件,得到一个BlockName(),其实是为了得到rename的名称
同理,在得到一个正式文件目录下的meta文件的同名文件名
利用rename,实现block文件和meta文件向正式文件目录的迁移
numBlocks += 1;
返回dest
如果需要创建subdir,就创建maxBlocksPerDir个subdir
new FSDir[maxBlocksPerDir];
然后随机选择一个subdir,继续执行addblock操作(类似递归调用)
返回
返回FSDataSet$FSVolumn.addBlock(b,f)中,
得到metaFile,然后重新计算磁盘使用的空间
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
返回dest = v.addBlock(b, f);
volumeMap.put(b, new DatanodeBlockInfo(v, dest));dest是返回的正式block
ongoingCreates.remove(b);移除ongoingCreates里面的对应信息
返回
返回finalizeblockInteranal
返回到PacketResponder的run函数里
datanode.myMetrics.incrBlocksWritten();
datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);//通知namenode收到了一个块
receivedBlockList.add(block);
delHints.add(delHint);
receivedBlockList.notifyAll();//receivedBlockList是需要关注的
把offset设置为0
开始创建应答
replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
PipelineAck replyAck = new PipelineAck(expected, replies);//expected 其实是一个seqno
然后通过replyOut向前一个节点发送
这个应答应该被DFSClient$DFSOutPutStream$ResponseProcessor 里面的run方法接受到
run循环继续运行,正常情况应该到块结束
内存中的文件节点,是在create的过程中被加进去的
注意INodeFile的移位操作
DatanodeDescriptor clientNode = 得到node的方式
host2DataNodeMap.getDatanodeByHost(clientMachine);
LunStatus luns = lunid2blockBitMap.get(block.getLunind());
luns.setTailposition(luns.getTailposition() + block.getNumBytes());//更新尾部的位置,为了下次allocateBlock作准备
if (block != storedBlock) {
if (block.getNumBytes() >= 0) {
long cursize = storedBlock.getNumBytes();
INodeFile file = storedBlock.getINode();
if (cursize == 0) {
storedBlock.setNumBytes(block.getNumBytes());
addStoredBlock(FsnameSystem)在系统的两个位置被调用blockReceived(DatanodeID, Block, String) 和processReport(DatanodeID, BlockListAsLongs)
前者是在offerService函数中被远程调用的namenode.blockReceived
后者同样是被offerService中间接调用 namenode.blockReport 返回DatanodeCommand
namenode.complete 上追溯可以到DFSOutPutStream.close, 这个函数在系统里有50个调用处
主要关注两个
datanode上blockreciever里的close
Dfsclient 里,leaseChecker的 close
8在addStoredBlock里
8.1 从fsnamesys里,拿到storedBlock BlockInfo storedBlock = blocksMap.getStoredBlock(block);
8.1.1正常情况下storedBlock是不应该为null的,非正常情况下的处理很复杂,需仔细研究
8.1.2在不为null的情况下
8.1.2.1 可以设置一些lunid或者lunoffset
8.1.2.2 拿到对应的fileINode INodeFile fileINode = storedBlock.getINode();//StoredBlock.inode.blocks[0].inode
8.1.2.3 向blocklist里面insert,实际上是插入到triple链表的头上 boolean added = node.addBlock(storedBlock); //回
到这个点,看看node->blocklist->triplets
8.1.2.4 判断这个将要报告的block,是不是这个underconstruction file的当前的最后一个block
最后得到一个boolean值:blockUnderConstruction = last.equals(storedBlock);
8.1.2.5 如果block != storedBlock,要作一些处理 需仔细研究
8.1.2.6 如果8.1.2.3 的added 是真,判断一下是否是在安全模式,如果不是安全模式,记录一条log(原因在程序的注释里)
8.1.2.7 计算一下numCurrentReplica ,然后更新一下和safemode相关的安全块数(需要继续研究)
8.1.2.8 如果8.1.2.4的blockUnderConstruction为真,
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
cons.addTarget(node);
return block;
8.1.2.9 如果8.1.2.8的条件没有满足,还有很多操作,需要继续研究
DFSClient$DFSOutputStream.closeInternal() line: 4035
DFSClient$DFSOutputStream.close() line: 3952
FSDataOutputStream$PositionCache.close() line: 61
FSDataOutputStream.close() line: 86
IOUtils.copyBytes(InputStream, OutputStream, int, boolean) line: 50
IOUtils.copyBytes(InputStream, OutputStream, Configuration, boolean) line: 100
FileUtil.copy(FileSystem, Path, FileSystem, Path, boolean, boolean, Configuration) line: 230
FileUtil.copy(FileSystem, Path[], FileSystem, Path, boolean, boolean, Configuration) line: 176
DistributedFileSystem(FileSystem).copyFromLocalFile(boolean, boolean, Path[], Path) line: 1183
FsShell.copyFromLocal(Path[], String) line: 139
FsShell.run(String[]) line: 2157
ToolRunner.run(Configuration, Tool, String[]) line: 65
ToolRunner.run(Tool, String[]) line: 79
FsShell.main(String[]) line: 2311
{
copyBytes(in, out, buffSize);
} finally {
if(close) {
out.close(); //这句里面,调用了namenode.complete
in.close();
}
RawLocalFileSystem.getFileStatus(Path) line: 393
LocalFileSystem(FilterFileSystem).getFileStatus(Path) line: 251
public FileStatus getFileStatus(Path f) throws IOException {
File path = pathToFile(f);
if (path.exists()) {
return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
} else {
throw new FileNotFoundException( "File " + f + " does not exist.");
}
}
相关推荐
"Hadoop集群安装笔记" Hadoop集群安装笔记是一篇详细的安装指南,旨在帮助新手快速搭建Hadoop学习环境。以下是该笔记中的重要知识点: Hadoop集群安装目录 在安装Hadoop集群之前,需要准备好安装环境。安装环境...
《传智播客Hadoop资料文档和笔记》是一份针对Hadoop技术的综合学习资源,由知名教育机构传智播客提供。这份资料涵盖了Hadoop生态系统的各个方面,旨在帮助学习者深入理解并掌握这一分布式计算框架的核心概念和技术。...
5. "hadoop第一天.ppt" - 这应是第一天课程的完整PPT,可能详细介绍了Hadoop的起源、架构、工作原理等内容。 6. "hadoop2.2.0伪分布式搭建.txt" - 这可能是一个文本指南,指导如何在单机环境下设置Hadoop的伪分布式...
【标题】"传智黑马赵星老师hadoop七天课程资料笔记-第七天(全)" 涵盖了Hadoop技术栈的重要知识点,这是一份关于Hadoop学习的详尽资料,特别关注了课程的最后一天内容。在Hadoop的学习过程中,第七天通常会涉及到系统...
- Hive:为数据仓库设计,提供了SQL方言HiveQL,允许用户编写类似SQL的查询语句来操作Hadoop中的数据。 - Sqoop:是一个开源工具,用于在Hadoop和关系数据库之间高效地传输大量数据。 - Oozie:是一个用于管理Hadoop...
【标题】"Hadoop之HBase学习笔记"主要聚焦于Hadoop生态中的分布式数据库HBase。HBase是一个基于Google Bigtable理念设计的开源NoSQL数据库,它运行在Hadoop之上,提供高性能、高可靠性以及可水平扩展的数据存储能力...
### Hadoop基础知识与实战应用详解 #### 一、Hadoop概览 **1.1 什么是Hadoop?** ...以上内容详细介绍了Hadoop的基本概念、部署方法以及常见的运维操作,希望能帮助读者更好地理解和使用Hadoop。
【Hadoop与HBase自学笔记】是一篇详细记录在Windows环境下搭建Hadoop和HBase分布式环境的教程。本文主要分为六个部分,涵盖了从基础环境准备到集群开发的全过程。 1. **安装JDK**:首先,你需要安装Java ...
### Hadoop Hive 入门学习笔记 #### 一、Hadoop Hive 概述 Hadoop Hive 是一个基于 Hadoop 的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,使得 Hadoop 上的数据可以被...
本笔记将深入探讨如何搭建Hadoop HA环境,并分享配置文件及其详细解读。 首先,我们要理解Hadoop HA的基本概念。HA主要涉及到两个关键组件:NameNode和ResourceManager。NameNode是HDFS的元数据管理节点,而...
### Hadoop从安装到配置详解 #### 一、概述 Hadoop是一款开源软件框架,用于分布式存储和处理大型数据集。它能够通过集群中的多台计算机来存储和处理大规模的数据,具有高可靠性、高扩展性和成本效益等优势。本文将...
1. **Hadoop day01.xmind**:可能包含了Hadoop的安装配置、环境搭建,以及HDFS的基本操作,如上传、下载文件,理解HDFS的文件块和副本策略。 2. **Hadoop day02.xmind**:可能进一步讲解MapReduce的工作原理,包括...
综上所述,Hadoop平台的搭建涉及到对大数据概念的理解,对Hadoop生态圈内各组件的认识,对Hadoop历史发展和技术演进的把握,以及对Linux操作系统和shell编程的熟练应用。这些都是构建和管理一个稳定高效的大数据平台...
这个压缩包“hadoop笔记打包下载(想学hadoop不下载后悔)”显然是一个丰富的学习资源集合,涵盖了Hadoop生态系统的多个重要组成部分。下面将详细解释这些关键知识点。 1. **Hadoop HDFS(Hadoop Distributed File ...
自己整理的hadoop学习笔记,很详尽 很真实。linux操作终端下遇到的各种Hadoop常见问题 解决方案
标题和描述中提到的是“传智黑马赵星老师hadoop七天课程资料笔记-第二天(全)”,这表明这是一个关于Hadoop技术的深度学习资源,主要聚焦于赵星老师的Hadoop教学课程中的第二天内容。通常,这样的课程会涵盖Hadoop的...
本学习笔记涵盖了Hadoop 1.0和2.0两个主要版本,旨在帮助读者全面理解Hadoop的核心概念、架构以及实际操作。 在Hadoop 1.0中,核心组件主要包括HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一种...
总的来说,这篇学习笔记展示了如何利用Hadoop的MapReduce框架处理大数据问题,特别是寻找最大值这类聚合操作。通过这个例子,我们可以理解MapReduce的核心思想,并学习如何编写Java代码来实现分布式计算任务。这对于...
压缩包“Hadoop大数据开发教程笔记软件.zip”包含了一份详实的Hadoop学习资源,特别是针对基于Hadoop的大数据开发基础进行讲解。 Hadoop是一个开源框架,由Apache软件基金会维护,它设计用于处理和存储大量数据。其...