DFSClient.RemoteBlockReader.newBlockReader()
public static BlockReader newBlockReader( Socket sock, String file, long blockId, Token<BlockTokenIdentifier> accessToken, long genStamp, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))); // write the header. 使用OutputStream发起读数据块的请求头报文 out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // ① out.write( DataTransferProtocol.OP_READ_BLOCK); // ② out.writeLong( blockId ); // ③ out.writeLong( genStamp ); // ④ out.writeLong( startOffset ); // ⑤ out.writeLong( len ); // ⑥ Text.writeString(out, clientName); // ⑦ accessToken.write(out); // ⑧ out.flush(); // Get bytes in block, set streams 使用InputStream接收DataNode传回来的数据 DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize)); short status = in.readShort(); // [1] 读取状态信息 if (status != DataTransferProtocol.OP_STATUS_SUCCESS) { } // throw Exception... DataChecksum checksum = DataChecksum.newDataChecksum( in ); // [2-1] checksumHeader(校验类型和校验块大小) long firstChunkOffset = in.readLong(); // [2-2] Read the first chunk offset第一个校验块的起始位置 if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file); } return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock); }
DataXceiver.readBlock()
/** Read a block from the disk. 读取本地磁盘上的数据块, 用于发送给客户端 * @param in The stream to read from 输入流,客户端发送的请求头数据, 用于读取解析数据构造BlockSender */ private void readBlock(DataInputStream in) throws IOException { // 1. Read in the header 读取客户端发送的请求头信息 // ① ② 在DataXceiver的run方法已经读取过了,然后分发到不同的子程序处理 long blockId = in.readLong(); // ③ 8bytes的blockId. Block block = new Block( blockId, 0 , in.readLong()); // ④ 8bytes的数据块版本号 long startOffset = in.readLong(); // ⑤ 8bytes的startOffset开始读取的Block的偏移量 long length = in.readLong(); // ⑥ 8bytes的读取长度 String clientName = Text.readString(in); // ⑦ 发送请求的客户端名称 Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>(); accessToken.readFields(in); // ⑧ 数据块的访问权限, 安全相关, 不讨论 OutputStream baseStream = NetUtils.getOutputStream(s, datanode.socketWriteTimeout); DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); // blockToken: If InvalidToken, out.writeShort(OP_STATUS_ERROR_ACCESS_TOKEN) // 2. send the block BlockSender blockSender = null; try { try { blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); } catch(IOException e) { out.writeShort(DataTransferProtocol.OP_STATUS_ERROR); // [1] 创建BlockSender对象失败,发送操作失败的状态标示返回给客户端 throw e; // --> catch(IOException e) 不会执行下面的语句哦 } out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // [1] send op status 创建BlockSender成功,发送成功状态标示返回给客户端 long read = blockSender.sendBlock(out, baseStream, null); // send data [2] 会发送checksumHeader + offset + PACKET if (blockSender.isBlockReadFully()) { // 客户端读取完整个数据块, 由客户端验证文件的校验和. 而不是在发送数据时验证 // 3. See if client verification succeeded. This is an optional response from client. if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK && datanode.blockScanner != null) { datanode.blockScanner.verifiedByClient(block); } } } catch ( SocketException ignored ) { // Its ok for remote side to close the connection anytime. } catch ( IOException ioe ) { throw ioe; } finally { IOUtils.closeStream(out); IOUtils.closeStream(blockSender); } }
BlockSender.sendBlock()
/** sendBlock() is used to read block and its metadata and stream the data to either a client or to another datanode. * 读取块和元数据, 通过输出流传送给客户端(读取请求)或者另一个datanode(复制请求) * @param out stream to which the block is written to 数据块要写入的输出流, 输出流即发送出去 * @param baseStream optional. if non-null, out is assumed to be a wrapper over this stream. * This enables optimizations for sending the data, e.g. SocketOutputStream#transferToFully(FileChannel, long, int). 使用FileChannel优化发送数据 * @param throttler for sending data. 发送数据节流器 * @return total bytes reads, including crc. 总共读取的字节,包括校验文件 */ long sendBlock(DataOutputStream out, OutputStream baseStream, BlockTransferThrottler throttler) throws IOException { this.throttler = throttler; initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; lastCacheDropOffset = initialOffset; if (isLongRead() && blockInFd != null) { // Advise that this file descriptor will be accessed sequentially. NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL); } manageOsCache(); // Trigger readahead of beginning of file if configured. try { // 1. 将checksum header通过DataOutputStream发送到客户端. 客户端通过DataInputStream接收 try { checksum.writeHeader(out); // ① ② 写入checksum header . DataChecksum在构造函数中构造,包含type, bytePerChecksum if ( chunkOffsetOK ) { // ③ 需要发送块的开始位置, 还需要写入offset out.writeLong( offset ); } out.flush(); } catch (IOException e) { throw ioeToSocketException(e); } //socket error // 2. 计算每个packet数据(checksum和数据)的大小 分配packet缓冲区大小(packet header + data + checksum) int maxChunksPerPacket; // 一次Packet可以发送最多多少个chunks int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // [1]packet header+length if (transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof FileInputStream) { // 检查是或允许transferTo, 使用FileChannel来传输数据, 而不是先将数据读取到缓冲区 FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); blockInPosition = fileChannel.position(); // blockInPosition also indicates sendChunks() uses transferTo. streamForSendChunks = baseStream; // assure a mininum buffer size. maxChunksPerPacket = (Math.max(BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO) + bytesPerChecksum - 1)/bytesPerChecksum; } else { maxChunksPerPacket = Math.max(1, (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum); } // packet buffer has to be able to do a normal transfer in the case of recomputing checksum pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; // [2](data length+checksum size)*chunks ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); // 3. 将所有packet写到out. Packet由一系列的Chunk组成. while (endOffset > offset) { manageOsCache(); long len = sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks); // ④ offset += len; totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*checksumSize); seqno++; } // 4. 将一整数(int)0写到out, 标记块的结束 mark the end of block out.writeInt(0); // ⑤ out.flush(); } catch (RuntimeException e) { throw new IOException("unexpected runtime exception", e); } finally { close();} blockReadFully = (initialOffset == 0 && offset >= blockLength); return totalRead; }
BlockSender.sendChunks()
/** Sends upto maxChunks chunks of data. 发送一个数据包 */ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) throws IOException { // 1. 计算数据包的长度 // Sends multiple chunks in one packet with a single write(). int len = (int) Math.min(endOffset - offset, (((long) bytesPerChecksum) * ((long) maxChunks))); // truncate len so that any partial chunks will be sent as a final packet. this is not necessary for correctness, // but partial chunks are ones that may be recomputed and sent via buffer copy, so try to minimize those bytes if (len > bytesPerChecksum && len % bytesPerChecksum != 0) { len -= len % bytesPerChecksum; } if (len == 0) return 0; int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; // 计算这个数据包中应该包含有多少个校验数据块 int packetLen = len + numChunks*checksumSize + 4; // len为数据长度, 中间为校验和的长度, 最后的4为下面⑤即数据长度 pkt.clear(); // 参数pkt是在sendBlock()经过计算pktSize的缓冲区 因此使用前要先清空buffer // packetLen是从数据长度字段开始的长度即包括⑤之后的长度. ⑤为4bytes, 然后分别是真正的数据和校验和的长度. // 实际上pkt的大小确定了下面要往该缓冲区pkt放入的数据的多少. 比如packet header分别从①-⑤, 对应的大小为4+8+8+1+4=21+4. // 2. 数据包头部信息写入缓冲区 write packet header pkt.putInt(packetLen); // ① 数据包长度 pkt.putLong(offset); // ② 数据包中的数据在Block中的开始位置 pkt.putLong(seqno); // ③ 数据包的编号 pkt.put((byte)((offset + len >= endOffset) ? 1 : 0)); // ④ 是否有数据包标志 ②③④其实对应sendBlock()中while循环的处理 pkt.putInt(len); // ⑤ 数据包中数据的长度 // 3. 校验和写入缓冲区 注: 此时还没发送到客户端. 只有通过OutputStream将buf内容写到OutputStream, 才算发送到客户端 int checksumOff = pkt.position(); // 当前缓冲区的位置及校验和的开始位置. 由此可见首先发送校验和 int checksumLen = numChunks * checksumSize; // 校验和的长度=chunks*4 byte[] buf = pkt.array(); // 缓冲区的大小为pktSize的大小. 用来暂存接下来的校验和和数据. 注: 将数据写到缓冲区中! if (checksumSize > 0 && checksumIn != null) { // ⑥ 从checksumIn输入流将校验和数据读取到缓冲区buf中 checksumIn.readFully(buf, checksumOff, checksumLen); // 将输入流的数据读取到buf开始位置为checksumOff, 长为checksumLen的区域 } // 通过pkt.position计算checksumOff, pkt前面存放了packet header. checksum要接着packet header int dataOff = checksumOff + checksumLen; // 校验和的开始位置+长度=数据的偏移量/开始位置 if (blockInPosition < 0) { // 如果>=0, 则使用零拷贝. 默认为-1. 如果允许零拷贝, 在sendBlock时会设置该值>=0 // 4. 数据写入缓冲区 normal transfer 从blockIn输入流读取块数据到缓冲区中偏移量为dataOff, 长度为len的区域 IOUtils.readFully(blockIn, buf, dataOff, len); // buf紧接着checksum的是data. data的开始位置为checksum的结束位置+1. +1其实由position内部实现 // 5. 对发送的数据验证校验和 (客户端读取数据不会执行此校验) if (verifyChecksum) { int dOff = dataOff; // 要计算的校验块(真正的数据)的起始位置 int cOff = checksumOff; // 校验块对应的校验和的起始位置. 校验块的起始位置每+512bytes, 校验和的起始位置就+4bytes int dLeft = len; // len最开始为多个chunk>512, min会取bytesPerChecksum=512. 在计算每个校验块后, dLeft递减512, 最后 dLeft可能<512 for (int i=0; i<numChunks; i++) { // 对每个校验块chunk计算校验和 checksum.reset(); int dLen = Math.min(dLeft, bytesPerChecksum); // 如果len不足512bytes, 则只对这部分数据计算校验和, 因为最后一部分可能不足512bytes checksum.update(buf, dOff, dLen); // 缓冲区buf存放的就是已经从blockIn读取出来的真正数据了. 可以直接对数据计算校验和 if (!checksum.compare(buf, cOff)) { // 比较校验和cOff开始和经过上面计算的校验和 throw new ChecksumException("Checksum failed at " + (offset + len - dLeft), len); } dLeft -= dLen; // 剩余参与计算的校验块的大小递减 dOff += dLen; // 起始位置递增, 即参与计算的下一个校验块的起始位置 cOff += checksumSize; // 校验和的起始位置经过一个校验块的计算也递增4bytes, 为的是和下一个校验块经过计算的校验和进行比较 } } // only recompute checksum if we can't trust the meta data due to concurrent writes 存在竞争条件重新计算校验和 if (memoizedBlock.hasBlockChanged(len)) {ChecksumUtil.updateChunkChecksum(buf, checksumOff, dataOff, len, checksum);} // 6. 将缓冲区的数据全部写到输出流OutputStream中, 完成向接收端的数据发送 out.write(buf, 0, dataOff + len); // ⑦ 缓冲区从0开始, 一直到真正数据的结束位置. 即发送整个PACKET } else { // 如果允许零拷贝, 在调用该方法之前的sendBlock就设置了blockInPosition为正数. 就会执行此零拷贝的优化方式. try { // use transferTo(). Checks on out and blockIn are already done. 通过Socket发送, 即使用FileChannel来优化发送数据, 而不是通过流的方式 // 4. 采用零拷贝主要针对的是要发送的数据. PACKET的header和checksum并没有使用零拷贝. 因为那部分数据比较小. SocketOutputStream sockOut = (SocketOutputStream) out; // 在sendBlock中已经确保了对象类型的正确性, 才允许进入零拷贝 FileChannel fileChannel = ((FileInputStream) blockIn).getChannel(); // 所以这里可以放心将out和blockIn转为零拷贝需要的类型 if (memoizedBlock.hasBlockChanged(len)) { fileChannel.position(blockInPosition); IOUtils.readFileChannelFully(fileChannel, buf, dataOff, len); ChecksumUtil.updateChunkChecksum(buf, checksumOff, dataOff, len, checksum); sockOut.write(buf, 0, dataOff + len); // ⑦ 数据块 } else { // 5. 首先将buf缓冲区的数据先发送到接收端对应的SocketOutputStream. 这部分数据是PACKET的header和checksum sockOut.write(buf, 0, dataOff); // ⑦ 使用Socket输出流发送缓存数据包,直接写到Socket输出流 first write the packet // 6. 接着使用零拷贝发送真正的数据. 普通的传输方式是将blockIn先写到buf中, 这之后还要将buf中的数据发送到接收端的OutputStream. // 而零拷贝直接将blockIn传输到接收端的socketOut, 免去了中间多余的两部分内存拷贝操作和上下文切换带来的系统开销. sockOut.transferToFully(fileChannel, blockInPosition, len); // no need to flush. since we know out is not a buffered stream. 零拷贝! } blockInPosition += len; } catch (IOException e) { // exception while writing to the client (well, with transferTo(), it could also be while reading from the local file). throw ioeToSocketException(e); } } if (throttler != null) { throttler.throttle(packetLen); } // rebalancing so throttle 调整发送速度 return len; }
全文请期待 http://zqhxuyuan.github.com 的相关博文
相关推荐
深入 DataNode 的源码,可以学习到数据的读取和写入流程,以及心跳机制和数据块报告。 3. **Block Placement Policy**:HDFS 的数据块放置策略决定了数据块在集群中的分布,有助于平衡负载和提高容错性。了解这些...
本文将重点探讨HDFS的源码分析,基于《Hadoop2.x HDFS源码剖析》这本书中的参考注释。首先,我们来看HDFS的核心组件——NameNode和DataNode。 1. NameNode:作为HDFS的元数据管理节点,NameNode负责维护文件系统的...
源码可以展示如何设置Direct Stream模式,实现Spark从Kafka topic中读取数据,以及如何将处理结果写回Kafka。 4. Spark与HBase的整合: HBase是一个基于Hadoop的NoSQL数据库,适合存储大量结构化或半结构化数据。...
此外,源码的结构和注释也是评判标准之一,良好的代码组织和清晰的注释可以提升代码的可读性和可维护性,这也是编程规范的重要体现。 总的来说,这个压缩包文件中的源码揭示了一个完整的数学建模过程,包括问题理解...
HBase的源码提供了丰富的注释和示例,可以加深对过滤器实现的理解。 总之,HBase过滤器是数据查询和分析的强大工具。正确地使用过滤器不仅可以提高查询效率,还能降低存储和计算资源的消耗。因此,对于处理大数据的...
FileUtil 的源码注释表明,它是一个远程文件系统的文件复制到本地的操作。这里没有说明远程文件系统是什么类型,因为这是 FileSystem 通用方法,可以根据文件系统的需求来继承覆盖里面的方法。 在使用 FileUtil 时...
HBase,基于Apache Hadoop的分布式键值存储系统,是设计用于处理大规模数据的NoSQL数据库。它支持水平扩展,能够处理PB级别的数据,并且提供实时查询功能。HBase的数据模型是基于行、列族和时间戳的,适合于大数据的...
2. **文件操作**:源码会包含读取和重命名文件的操作,这可能涉及到`os`(Python)、`System.IO`(C#)或`File`(Java)等库的使用。 3. **文件后缀处理**:程序需要添加后缀到文件名,这可能通过字符串操作完成,...
在标签部分提到了“源码工具”,这可能意味着文章会涉及到Spark的源码编译过程,或者是介绍一些辅助Spark开发的工具。 在提供的部分内容中,大量提到了IntelliJ IDEA的快捷键和功能,这表明文章在介绍Spark开发环境...
标题中的“备注标签博文篇”可能是指一篇关于在编程或文档管理中使用备注标签的文章,这类标签常用于标记代码、注释或者文件,以便于后期的查找和理解。描述中的“NULL”意味着没有提供具体文章内容,所以我们需要...
ZooKeeper是一个分布式的,开放源码的协调服务,它为分布式应用提供一致性服务,如命名服务、配置管理、组服务、分布式同步和分区间复制等。这个Javadoc包含了该版本的API详细说明,是开发人员理解和使用ZooKeeper ...
5. **并行处理和大数据处理**:由于基因组数据量巨大,项目可能利用多线程或分布式计算框架(如Hadoop或Spark)来处理数据。 6. **测试框架**:如JUnit,用于编写和执行单元测试,确保代码的正确性和稳定性。 7. *...
这些IDE不仅限于特定编程语言,还支持搜索注释、变量名、函数名等,并能提供上下文信息。 对于大型项目或代码库,Git这样的版本控制系统也提供了搜索功能。通过`git grep`,我们可以快速在提交历史中找到特定字符串...
7. **文档和注释**:优秀的源代码通常会有详细的注释和文档,帮助开发者理解各个部分的功能和用法。 学习和研究这个压缩包,可以帮助开发者深入理解如何在Java环境下设计和实现AI模型,同时也为那些想要将AI技术...
Apache Spark是一款用于大规模数据处理的开源框架,它提供了内存计算、分布式计算等特性,使得数据处理速度相比Hadoop MapReduce有了显著提升。Spark的主要组件包括:Spark Core、Spark SQL、Spark Streaming、MLlib...
4. **数据存储**:可能使用了像PostGIS(扩展了PostgreSQL的地理空间数据库)这样的地理空间数据库,或者Hadoop等大数据处理框架来存储和处理大量地理数据。 5. **并发编程**:对于大规模数据处理,Java的并发特性...
例如,Solr 安装目录、数据目录、日志文件存储位置等,确保所有必要的文件都被放置在正确的目录下,以便 Solr 正确读取配置和数据。 #### API 示例 文档中包含了多个 API 示例,帮助开发者理解和掌握 Solr 提供的...