- 浏览: 423032 次
- 性别:
- 来自: 北京
-
最新评论
-
springdata_spring:
apache lucene开源框架demo使用实例教程源代码下 ...
有关Lucene的问题(6):Lucene的事务性 -
jaychang:
必须要感谢作者的分享,对理解Lucene的工作原理帮助很大
Lucene学习总结之一:全文检索的基本原理 -
yin_kaihua:
...
Lucene学习总结之三:Lucene的索引文件格式 (1) -
djh122:
...
Lucene 原理与代码分析完整版 -
wayne0830:
多谢楼主分享!
Lucene 原理与代码分析完整版
一、文件的打开
1.1、客户端
HDFS打开一个文件,需要在客户端调用DistributedFileSystem.open(Path f, int bufferSize),其实现为:
public FSDataInputStream open(Path f, int bufferSize) throws IOException { return new DFSClient.DFSDataInputStream( dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); } |
其中dfs为DistributedFileSystem的成员变量DFSClient,其open函数被调用,其中创建一个DFSInputStream(src, buffersize, verifyChecksum)并返回。
在DFSInputStream的构造函数中,openInfo函数被调用,其主要从namenode中得到要打开的文件所对应的blocks的信息,实现如下:
synchronized void openInfo() throws IOException { LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); this.locatedBlocks = newInfo; this.currentNode = null; } |
private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException { return namenode.getBlockLocations(src, start, length); } |
LocatedBlocks主要包含一个链表的List<LocatedBlock> blocks,其中每个LocatedBlock包含如下信息:
- Block b:此block的信息
- long offset:此block在文件中的偏移量
- DatanodeInfo[] locs:此block位于哪些DataNode上
上面namenode.getBlockLocations是一个RPC调用,最终调用NameNode类的getBlockLocations函数。
1.2、NameNode
NameNode.getBlockLocations实现如下:
public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException { return namesystem.getBlockLocations(getClientMachine(), src, offset, length); } |
namesystem是NameNode一个成员变量,其类型为FSNamesystem,保存的是NameNode的name space树,其中一个重要的成员变量为FSDirectory dir。
FSDirectory和Lucene中的FSDirectory没有任何关系,其主要包括FSImage fsImage,用于读写硬盘上的fsimage文件,FSImage类有成员变量FSEditLog editLog,用于读写硬盘上的edit文件,这两个文件的关系在上一篇文章中已经解释过。
FSDirectory还有一个重要的成员变量INodeDirectoryWithQuota rootDir,INodeDirectoryWithQuota的父类为INodeDirectory,实现如下:
public class INodeDirectory extends INode { …… private List<INode> children; …… } |
由此可见INodeDirectory本身是一个INode,其中包含一个链表的INode,此链表中,如果仍为文件夹,则是类型INodeDirectory,如果是文件,则是类型INodeFile,INodeFile中有成员变量BlockInfo blocks[],是此文件包含的block的信息。显然这是一棵树形的结构。
FSNamesystem.getBlockLocations函数如下:
public LocatedBlocks getBlockLocations(String src, long offset, long length, boolean doAccessTime) throws IOException { final LocatedBlocks ret = getBlockLocationsInternal(src, dir.getFileINode(src), offset, length, Integer.MAX_VALUE, doAccessTime); return ret; } |
dir.getFileINode(src)通过路径名从文件系统树中找到INodeFile,其中保存的是要打开的文件的INode的信息。
getBlockLocationsInternal的实现如下:
private synchronized LocatedBlocks getBlockLocationsInternal(String src, INodeFile inode, long offset, long length, int nrBlocksToReturn, boolean doAccessTime) throws IOException { //得到此文件的block信息 Block[] blocks = inode.getBlocks(); List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length); //计算从offset开始,长度为length所涉及的blocks int curBlk = 0; long curPos = 0, blkSize = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { blkSize = blocks[curBlk].getNumBytes(); if (curPos + blkSize > offset) { //当offset在curPos和curPos + blkSize之间的时候,curBlk指向offset所在的block break; } curPos += blkSize; } long endOff = offset + length; //循环,依次遍历从curBlk开始的每个block,直到当前位置curPos越过endOff do { int numNodes = blocksMap.numNodes(blocks[curBlk]); int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas(); int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); boolean blockCorrupt = (numCorruptNodes == numNodes); int numMachineSet = blockCorrupt ? numNodes : (numNodes - numCorruptNodes); //依次找到此block所对应的datanode,将其中没有损坏的放入machineSet中 DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet]; if (numMachineSet > 0) { numNodes = 0; for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) { DatanodeDescriptor dn = it.next(); boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn); if (blockCorrupt || (!blockCorrupt && !replicaCorrupt)) machineSet[numNodes++] = dn; } } //使用此machineSet和当前的block构造一个LocatedBlock results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos, blockCorrupt)); curPos += blocks[curBlk].getNumBytes(); curBlk++; } while (curPos < endOff && curBlk < blocks.length && results.size() < nrBlocksToReturn); //使用此LocatedBlock链表构造一个LocatedBlocks对象返回 return inode.createLocatedBlocks(results); } |
1.3、客户端
通过RPC调用,在NameNode得到的LocatedBlocks对象,作为成员变量构造DFSInputStream对象,最后包装为FSDataInputStream返回给用户。
二、文件的读取
2.1、客户端
文件读取的时候,客户端利用文件打开的时候得到的FSDataInputStream.read(long position, byte[] buffer, int offset, int length)函数进行文件读操作。
FSDataInputStream会调用其封装的DFSInputStream的read(long position, byte[] buffer, int offset, int length)函数,实现如下:
public int read(long position, byte[] buffer, int offset, int length) throws IOException { long filelen = getFileLength(); int realLen = length; if ((position + length) > filelen) { realLen = (int)(filelen - position); } //首先得到包含从offset到offset + length内容的block列表 //比如对于64M一个block的文件系统来说,欲读取从100M开始,长度为128M的数据,则block列表包括第2,3,4块block List<LocatedBlock> blockRange = getBlockRange(position, realLen); int remaining = realLen; //对每一个block,从中读取内容 //对于上面的例子,对于第2块block,读取从36M开始,读取长度28M,对于第3块,读取整一块64M,对于第4块,读取从0开始,长度为36M,共128M数据 for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset); remaining -= bytesToRead; position += bytesToRead; offset += bytesToRead; } assert remaining == 0 : "Wrong number of bytes read."; if (stats != null) { stats.incrementBytesRead(realLen); } return realLen; } |
其中getBlockRange函数如下:
private synchronized List<LocatedBlock> getBlockRange(long offset, long length) throws IOException { List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>(); //首先从缓存的locatedBlocks中查找offset所在的block在缓存链表中的位置 int blockIdx = locatedBlocks.findBlock(offset); if (blockIdx < 0) { // block is not cached blockIdx = LocatedBlocks.getInsertIndex(blockIdx); } long remaining = length; long curOff = offset; while(remaining > 0) { LocatedBlock blk = null; //按照blockIdx的位置找到block if(blockIdx < locatedBlocks.locatedBlockCount()) blk = locatedBlocks.get(blockIdx); //如果block为空,则缓存中没有此block,则直接从NameNode中查找这些block,并加入缓存 if (blk == null || curOff < blk.getStartOffset()) { LocatedBlocks newBlocks; newBlocks = callGetBlockLocations(namenode, src, curOff, remaining); locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks()); continue; } //如果block找到,则放入结果集 blockRange.add(blk); long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff; remaining -= bytesRead; curOff += bytesRead; //取下一个block blockIdx++; } return blockRange; } |
其中fetchBlockByteRange实现如下:
private void fetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset) throws IOException { Socket dn = null; int numAttempts = block.getLocations().length; //此while循环为读取失败后的重试次数 while (dn == null && numAttempts-- > 0 ) { //选择一个DataNode来读取数据 DNAddrPair retval = chooseDataNode(block); DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; BlockReader reader = null; try { //创建Socket连接到DataNode dn = socketFactory.createSocket(); dn.connect(targetAddr, socketTimeout); dn.setSoTimeout(socketTimeout); int len = (int) (end - start + 1); //利用建立的Socket链接,生成一个reader负责从DataNode读取数据 reader = BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(), start, len, buffersize, verifyChecksum, clientName); //读取数据 int nread = reader.readAll(buf, offset, len); return; } finally { IOUtils.closeStream(reader); IOUtils.closeSocket(dn); dn = null; } //如果读取失败,则将此DataNode标记为失败节点 addToDeadNodes(chosenNode); } } |
BlockReader.newBlockReader函数实现如下:
public static BlockReader newBlockReader( Socket sock, String file, long blockId, long genStamp, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName) throws IOException { //使用Socket建立写入流,向DataNode发送读指令 DataOutputStream out = new DataOutputStream( new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))); out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); out.write( DataTransferProtocol.OP_READ_BLOCK ); out.writeLong( blockId ); out.writeLong( genStamp ); out.writeLong( startOffset ); out.writeLong( len ); Text.writeString(out, clientName); out.flush(); //使用Socket建立读入流,用于从DataNode读取数据 DataInputStream in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize)); DataChecksum checksum = DataChecksum.newDataChecksum( in ); long firstChunkOffset = in.readLong(); //生成一个reader,主要包含读入流,用于读取数据 return new BlockReader( file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock ); } |
BlockReader的readAll函数就是用上面生成的DataInputStream读取数据。
2.2、DataNode
在DataNode启动的时候,会调用函数startDataNode,其中与数据读取有关的逻辑如下:
void startDataNode(Configuration conf, AbstractList<File> dataDirs ) throws IOException { …… // 建立一个ServerSocket,并生成一个DataXceiverServer来监控客户端的链接 ServerSocket ss = (socketWriteTimeout > 0) ? ServerSocketChannel.open().socket() : new ServerSocket(); Server.bind(ss, socAddr, 0); ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); // adjust machine name with the actual port tmpPort = ss.getLocalPort(); selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(), tmpPort); this.dnRegistration.setName(machineName + ":" + tmpPort); this.threadGroup = new ThreadGroup("dataXceiverServer"); this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this)); this.threadGroup.setDaemon(true); // auto destroy when empty …… } |
DataXceiverServer.run()函数如下:
public void run() { while (datanode.shouldRun) { //接受客户端的链接 Socket s = ss.accept(); s.setTcpNoDelay(true); //生成一个线程DataXceiver来对建立的链接提供服务 new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)).start(); } try { ss.close(); } catch (IOException ie) { LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " + StringUtils.stringifyException(ie)); } } |
DataXceiver.run()函数如下:
public void run() { DataInputStream in=null; try { //建立一个输入流,读取客户端发送的指令 in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE)); short version = in.readShort(); boolean local = s.getInetAddress().equals(s.getLocalAddress()); byte op = in.readByte(); // Make sure the xciver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); long startTime = DataNode.now(); switch ( op ) { //读取 case DataTransferProtocol.OP_READ_BLOCK: //真正的读取数据 readBlock( in ); datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime); if (local) datanode.myMetrics.readsFromLocalClient.inc(); else datanode.myMetrics.readsFromRemoteClient.inc(); break; //写入 case DataTransferProtocol.OP_WRITE_BLOCK: //真正的写入数据 writeBlock( in ); datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime); if (local) datanode.myMetrics.writesFromLocalClient.inc(); else datanode.myMetrics.writesFromRemoteClient.inc(); break; //其他的指令 …… } } catch (Throwable t) { LOG.error(datanode.dnRegistration + ":DataXceiver",t); } finally { IOUtils.closeStream(in); IOUtils.closeSocket(s); dataXceiverServer.childSockets.remove(s); } } |
private void readBlock(DataInputStream in) throws IOException { //读取指令 long blockId = in.readLong(); Block block = new Block( blockId, 0 , in.readLong()); long startOffset = in.readLong(); long length = in.readLong(); String clientName = Text.readString(in); //创建一个写入流,用于向客户端写数据 OutputStream baseStream = NetUtils.getOutputStream(s, datanode.socketWriteTimeout); DataOutputStream out = new DataOutputStream( new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); //生成BlockSender用于读取本地的block的数据,并发送给客户端 //BlockSender有一个成员变量InputStream blockIn用于读取本地block的数据 BlockSender blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status //向客户端写入数据 long read = blockSender.sendBlock(out, baseStream, null); …… } finally { IOUtils.closeStream(out); IOUtils.closeStream(blockSender); } } |
三、文件的写入
下面解析向hdfs上传一个文件的过程。
3.1、客户端
上传一个文件到hdfs,一般会调用DistributedFileSystem.create,其实现如下:
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { return new FSDataOutputStream (dfs.create(getPathName(f), permission, overwrite, replication, blockSize, progress, bufferSize), statistics); } |
其最终生成一个FSDataOutputStream用于向新生成的文件中写入数据。其成员变量dfs的类型为DFSClient,DFSClient的create函数如下:
public OutputStream create(String src, FsPermission permission, boolean overwrite, short replication, long blockSize, Progressable progress, int buffersize ) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getDefault(); } FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf)); OutputStream result = new DFSOutputStream(src, masked, overwrite, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); leasechecker.put(src, result); return result; } |
其中构造了一个DFSOutputStream,在其构造函数中,同过RPC调用NameNode的create来创建一个文件。
当然,构造函数中还做了一件重要的事情,就是streamer.start(),也即启动了一个pipeline,用于写数据,在写入数据的过程中,我们会仔细分析。
DFSOutputStream(String src, FsPermission masked, boolean overwrite, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum); computePacketChunkSize(writePacketSize, bytesPerChecksum); try { namenode.create( src, masked, clientName, overwrite, replication, blockSize); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, QuotaExceededException.class); } streamer.start(); } |
3.2、NameNode
NameNode的create函数调用namesystem.startFile函数,其又调用startFileInternal函数,实现如下:
private synchronized void startFileInternal(String src, PermissionStatus permissions, String holder, String clientMachine, boolean overwrite, boolean append, short replication, long blockSize ) throws IOException { ...... //创建一个新的文件,状态为under construction,没有任何data block与之对应 long genstamp = nextGenerationStamp(); INodeFileUnderConstruction newNode = dir.addFile(src, permissions, replication, blockSize, holder, clientMachine, clientNode, genstamp); ...... } |
3.3、客户端
下面轮到客户端向新创建的文件中写入数据了,一般会使用FSDataOutputStream的write函数,最终会调用DFSOutputStream的writeChunk函数:
按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1, 2, 3,则会进行如下的过程:
- 首先将package 1写入DataNode 1
- 然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
- 然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
- 就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) throws IOException { //创建一个package,并写入数据 currentPacket = new Packet(packetSize, chunksPerPacket, bytesCurBlock); currentPacket.writeChecksum(checksum, 0, cklen); currentPacket.writeData(b, offset, len); currentPacket.numChunks++; bytesCurBlock += len; //如果此package已满,则放入队列中准备发送 if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize) { ...... dataQueue.addLast(currentPacket); //唤醒等待dataqueue的传输线程,也即DataStreamer dataQueue.notifyAll(); currentPacket = null; ...... } } |
DataStreamer的run函数如下:
public void run() { while (!closed && clientRunning) { Packet one = null; synchronized (dataQueue) { //如果队列中没有package,则等待 while ((!closed && !hasError && clientRunning && dataQueue.size() == 0) || doSleep) { try { dataQueue.wait(1000); } catch (InterruptedException e) { } doSleep = false; } try { //得到队列中的第一个package one = dataQueue.getFirst(); long offsetInBlock = one.offsetInBlock; //由NameNode分配block,并生成一个写入流指向此block if (blockStream == null) { nodes = nextBlockOutputStream(src); response = new ResponseProcessor(nodes); response.start(); } ByteBuffer buf = one.getBuffer(); //将package从dataQueue移至ackQueue,等待确认 dataQueue.removeFirst(); dataQueue.notifyAll(); synchronized (ackQueue) { ackQueue.addLast(one); ackQueue.notifyAll(); } //利用生成的写入流将数据写入DataNode中的block blockStream.write(buf.array(), buf.position(), buf.remaining()); if (one.lastPacketInBlock) { blockStream.writeInt(0); //表示此block写入完毕 } blockStream.flush(); } catch (Throwable e) { } } ...... } |
其中重要的一个函数是nextBlockOutputStream,实现如下:
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException { LocatedBlock lb = null; boolean retry = false; DatanodeInfo[] nodes; int count = conf.getInt("dfs.client.block.write.retries", 3); boolean success; do { ...... //由NameNode为文件分配DataNode和block lb = locateFollowingBlock(startTime); block = lb.getBlock(); nodes = lb.getLocations(); //创建向DataNode的写入流 success = createBlockOutputStream(nodes, clientName, false); ...... } while (retry && --count >= 0); return nodes; } |
locateFollowingBlock中通过RPC调用namenode.addBlock(src, clientName)函数
3.4、NameNode
NameNode的addBlock函数实现如下:
public LocatedBlock addBlock(String src, String clientName) throws IOException { LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName); return locatedBlock; } |
FSNamesystem的getAdditionalBlock实现如下:
public LocatedBlock getAdditionalBlock(String src, String clientName ) throws IOException { long fileLength, blockSize; int replication; DatanodeDescriptor clientNode = null; Block newBlock = null; ...... //为新的block选择DataNode DatanodeDescriptor targets[] = replicator.chooseTarget(replication, clientNode, null, blockSize); ...... //得到文件路径中所有path的INode,其中最后一个是新添加的文件对的INode,状态为under construction INode[] pathINodes = dir.getExistingPathINodes(src); int inodesLen = pathINodes.length; INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) pathINodes[inodesLen - 1]; //为文件分配block, 并设置在那写DataNode上 newBlock = allocateBlock(src, pathINodes); pendingFile.setTargets(targets); ...... return new LocatedBlock(newBlock, targets, fileLength); } |
3.5、客户端
在分配了DataNode和block以后,createBlockOutputStream开始写入数据。
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client, boolean recoveryFlag) { //创建一个socket,链接DataNode InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName()); s = socketFactory.createSocket(); int timeoutValue = 3000 * nodes.length + socketTimeout; s.connect(target, timeoutValue); s.setSoTimeout(timeoutValue); s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE); long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length + datanodeWriteTimeout; DataOutputStream out = new DataOutputStream( new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), DataNode.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); //写入指令 out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); out.write( DataTransferProtocol.OP_WRITE_BLOCK ); out.writeLong( block.getBlockId() ); out.writeLong( block.getGenerationStamp() ); out.writeInt( nodes.length ); out.writeBoolean( recoveryFlag ); Text.writeString( out, client ); out.writeBoolean(false); out.writeInt( nodes.length - 1 ); //注意,次循环从1开始,而非从0开始。将除了第一个DataNode以外的另外两个DataNode的信息发送给第一个DataNode, 第一个DataNode可以根据此信息将数据写给另两个DataNode for (int i = 1; i < nodes.length; i++) { nodes[i].write(out); } checksum.writeHeader( out ); out.flush(); firstBadLink = Text.readString(blockReplyStream); if (firstBadLink.length() != 0) { throw new IOException("Bad connect ack with firstBadLink " + firstBadLink); } blockStream = out; } |
客户端在DataStreamer的run函数中创建了写入流后,调用blockStream.write将数据写入DataNode
3.6、DataNode
DataNode的DataXceiver中,收到指令DataTransferProtocol.OP_WRITE_BLOCK则调用writeBlock函数:
private void writeBlock(DataInputStream in) throws IOException { DatanodeInfo srcDataNode = null; //读入头信息 Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong()); int pipelineSize = in.readInt(); // num of datanodes in entire pipeline boolean isRecovery = in.readBoolean(); // is this part of recovery? String client = Text.readString(in); // working on behalf of this client boolean hasSrcDataNode = in.readBoolean(); // is src node info present if (hasSrcDataNode) { srcDataNode = new DatanodeInfo(); srcDataNode.readFields(in); } int numTargets = in.readInt(); if (numTargets < 0) { throw new IOException("Mislabelled incoming datastream."); } //读入剩下的DataNode列表,如果当前是第一个DataNode,则此列表中收到的是第二个,第三个DataNode的信息,如果当前是第二个DataNode,则受到的是第三个DataNode的信息 DatanodeInfo targets[] = new DatanodeInfo[numTargets]; for (int i = 0; i < targets.length; i++) { DatanodeInfo tmp = new DatanodeInfo(); tmp.readFields(in); targets[i] = tmp; } DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target DataOutputStream replyOut = null; // stream to prev target Socket mirrorSock = null; // socket to next target BlockReceiver blockReceiver = null; // responsible for data handling String mirrorNode = null; // the name:port of next target String firstBadLink = ""; // first datanode that failed in connection setup try { //生成一个BlockReceiver, 其有成员变量DataInputStream in为从客户端或者上一个DataNode读取数据,还有成员变量DataOutputStream mirrorOut,用于向下一个DataNode写入数据,还有成员变量OutputStream out用于将数据写入本地。 blockReceiver = new BlockReceiver(block, in, s.getRemoteSocketAddress().toString(), s.getLocalSocketAddress().toString(), isRecovery, client, srcDataNode, datanode); // get a connection back to the previous target replyOut = new DataOutputStream( NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); //如果当前不是最后一个DataNode,则同下一个DataNode建立socket连接 if (targets.length > 0) { InetSocketAddress mirrorTarget = null; // Connect to backup machine mirrorNode = targets[0].getName(); mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorSock = datanode.newSocket(); int timeoutValue = numTargets * datanode.socketTimeout; int writeTimeout = datanode.socketWriteTimeout + (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets); mirrorSock.connect(mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE); //创建向下一个DataNode写入数据的流 mirrorOut = new DataOutputStream( new BufferedOutputStream( NetUtils.getOutputStream(mirrorSock, writeTimeout), SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock)); mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK ); mirrorOut.writeLong( block.getBlockId() ); mirrorOut.writeLong( block.getGenerationStamp() ); mirrorOut.writeInt( pipelineSize ); mirrorOut.writeBoolean( isRecovery ); Text.writeString( mirrorOut, client ); mirrorOut.writeBoolean(hasSrcDataNode); if (hasSrcDataNode) { // pass src node information srcDataNode.write(mirrorOut); } mirrorOut.writeInt( targets.length - 1 ); //此出也是从1开始,将除了下一个DataNode的其他DataNode信息发送给下一个DataNode for ( int i = 1; i < targets.length; i++ ) { targets[i].write( mirrorOut ); } blockReceiver.writeChecksumHeader(mirrorOut); mirrorOut.flush(); } //使用BlockReceiver接受block String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets.length); ...... } finally { // close all opened streams IOUtils.closeStream(mirrorOut); IOUtils.closeStream(mirrorIn); IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); } } |
BlockReceiver的receiveBlock函数中,一段重要的逻辑如下:
void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, BlockTransferThrottler throttlerArg, int numTargets) throws IOException { ...... //不断的接受package,直到结束 while (receivePacket() > 0) {} if (mirrorOut != null) { try { mirrorOut.writeInt(0); // mark the end of the block mirrorOut.flush(); } catch (IOException e) { handleMirrorOutError(e); } } ...... } |
BlockReceiver的receivePacket函数如下:
private int receivePacket() throws IOException { //从客户端或者上一个节点接收一个package int payloadLen = readNextPacket(); buf.mark(); //read the header buf.getInt(); // packet length offsetInBlock = buf.getLong(); // get offset of packet in block long seqno = buf.getLong(); // get seqno boolean lastPacketInBlock = (buf.get() != 0); int endOfHeader = buf.position(); buf.reset(); setBlockPosition(offsetInBlock); //将package写入下一个DataNode if (mirrorOut != null) { try { mirrorOut.write(buf.array(), buf.position(), buf.remaining()); mirrorOut.flush(); } catch (IOException e) { handleMirrorOutError(e); } } buf.position(endOfHeader); int len = buf.getInt(); offsetInBlock += len; int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* checksumSize; int checksumOff = buf.position(); int dataOff = checksumOff + checksumLen; byte pktBuf[] = buf.array(); buf.position(buf.limit()); // move to the end of the data. ...... //将数据写入本地的block out.write(pktBuf, dataOff, len); /// flush entire packet before sending ack flush(); // put in queue for pending acks if (responder != null) { ((PacketResponder)responder.getRunnable()).enqueue(seqno, lastPacketInBlock); } return payloadLen; } |
相关推荐
毕业设计选题 -未来生鲜运输车设计.pptx
内容概要:本文详细探讨了基于樽海鞘算法(SSA)优化的极限学习机(ELM)在回归预测任务中的应用,并与传统的BP神经网络、广义回归神经网络(GRNN)以及未优化的ELM进行了性能对比。首先介绍了ELM的基本原理,即通过随机生成输入层与隐藏层之间的连接权重及阈值,仅需计算输出权重即可快速完成训练。接着阐述了SSA的工作机制,利用樽海鞘群体觅食行为优化ELM的输入权重和隐藏层阈值,从而提高模型性能。随后分别给出了BP、GRNN、ELM和SSA-ELM的具体实现代码,并通过波士顿房价数据集和其他工业数据集验证了各模型的表现。结果显示,SSA-ELM在预测精度方面显著优于其他三种方法,尽管其训练时间较长,但在实际应用中仍具有明显优势。 适合人群:对机器学习尤其是回归预测感兴趣的科研人员和技术开发者,特别是那些希望深入了解ELM及其优化方法的人。 使用场景及目标:适用于需要高效、高精度回归预测的应用场景,如金融建模、工业数据分析等。主要目标是提供一种更为有效的回归预测解决方案,尤其是在处理大规模数据集时能够保持较高的预测精度。 其他说明:文中提供了详细的代码示例和性能对比图表,帮助读者更好地理解和复现实验结果。同时提醒使用者注意SSA参数的选择对模型性能的影响,建议进行参数敏感性分析以获得最佳效果。
2025年中国生成式AI大会PPT(4-1)
内容概要:本文详细介绍了基于Simulink平台构建无刷直流电机(BLDC)双闭环调速系统的全过程。首先阐述了双闭环控制系统的基本架构,即外层速度环和内层电流环的工作原理及其相互关系。接着深入探讨了PWM生成模块的设计,特别是占空比计算方法的选择以及三角波频率的设定。文中还提供了详细的电机参数设置指导,如转动惯量、电感、电阻等,并强调了参数选择对系统性能的影响。此外,针对PI控制器的参数整定给出了具体的公式和经验值,同时分享了一些实用的调试技巧,如避免转速超调、处理启动抖动等问题的方法。最后,通过仿真实验展示了系统的稳定性和鲁棒性,验证了所提出方法的有效性。 适用人群:从事电机控制研究的技术人员、自动化工程领域的研究生及科研工作者。 使用场景及目标:适用于需要深入了解和掌握无刷直流电机双闭环调速系统设计与优化的人群。主要目标是帮助读者学会利用Simulink进行BLDC电机控制系统的建模、仿真和参数优化,从而提高系统的稳定性和响应速度。 其他说明:文章不仅提供了理论知识,还包括了许多实践经验和技术细节,有助于读者更好地理解和应用相关技术。
内容概要:本文详细介绍了西门子S7-1200 PLC与施耐德ATV310/312变频器通过Modbus RTU进行通讯的具体实现步骤和调试技巧。主要内容涵盖硬件接线、通讯参数配置、控制启停、设定频率、读取运行参数的方法以及常见的调试问题及其解决方案。文中提供了具体的代码示例,帮助读者理解和实施通讯程序。此外,还强调了注意事项,如地址偏移量、数据格式转换和超时匹配等。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些需要将西门子PLC与施耐德变频器进行集成的工作人员。 使用场景及目标:适用于需要通过Modbus RTU协议实现PLC与变频器通讯的工程项目。目标是确保通讯稳定可靠,掌握解决常见问题的方法,提高调试效率。 其他说明:文中提到的实际案例和调试经验有助于读者避免常见错误,快速定位并解决问题。建议读者在实践中结合提供的代码示例和调试工具进行操作。
内容概要:本文详细介绍了如何使用Verilog在FPGA上实现IIC(Inter-Integrated Circuit)主从机驱动。主要内容包括从机和主机的设计,特别是状态机的实现、寄存器读取、时钟分频策略、SDA线的三态控制等关键技术。文中还提供了详细的代码片段,展示了从机地址匹配逻辑、主机时钟生成逻辑、顶层模块的连接方法以及仿真实验的具体步骤。此外,文章讨论了一些常见的调试问题,如总线竞争、时序不匹配等,并给出了相应的解决方案。 适合人群:具备一定FPGA开发基础的技术人员,尤其是对IIC协议感兴趣的嵌入式系统开发者。 使用场景及目标:适用于需要在FPGA平台上实现高效、可靠的IIC通信的应用场景。主要目标是帮助读者掌握IIC协议的工作原理,能够独立完成IIC主从机系统的开发和调试。 其他说明:文章不仅提供了理论讲解,还包括了大量的实战经验和代码实例,有助于读者更好地理解和应用所学知识。同时,文章还提供了一个思考题,引导读者进一步探索多主设备仲裁机制的设计思路。
内容概要:本文介绍了一款基于C#开发的拖拽式Halcon可视化抓边、抓圆控件,旨在简化机器视觉项目中的测量任务。该控件通过拖拽操作即可快速生成测量区域,自动完成边缘坐标提取,并提供实时反馈。文中详细描述了控件的工作原理和技术细节,如坐标系转换、卡尺生成、边缘检测算法封装以及动态参数调试等功能。此外,还讨论了一些常见问题及其解决方案,如坐标系差异、内存管理等。 适合人群:从事机器视觉开发的技术人员,尤其是熟悉C#和Halcon的开发者。 使用场景及目标:适用于需要频繁进行边缘和圆形特征测量的工业自动化项目,能够显著提高测量效率并减少编码工作量。主要目标是将复杂的测量任务转化为简单的拖拽操作,使非专业人员也能轻松完成测量配置。 其他说明:该控件已开源发布在GitHub上,提供了完整的源代码和详细的使用指南。未来计划扩展更多高级功能,如自动路径规划和亚像素级齿轮齿距检测等。
内容概要:本文详细介绍了西门子200Smart PLC与维纶触摸屏在某疫苗车间控制系统的具体应用,涵盖配液、发酵、纯化及CIP清洗四个主要工艺环节。文中不仅展示了具体的编程代码和技术细节,还分享了许多实战经验和调试技巧。例如,在配液罐中,通过模拟量处理确保温度和液位的精确控制;发酵罐部分,着重讨论了PID参数整定和USS通讯控制变频器的方法;纯化过程中,强调了双PID串级控制的应用;CIP清洗环节,则涉及复杂的定时器逻辑和阀门联锁机制。此外,文章还提到了一些常见的陷阱及其解决方案,如通讯干扰、状态机切换等问题。 适合人群:具有一定PLC编程基础的技术人员,尤其是从事工业自动化领域的工程师。 使用场景及目标:适用于需要深入了解PLC与触摸屏集成控制系统的工程师,帮助他们在实际项目中更好地理解和应用相关技术和方法,提高系统的稳定性和可靠性。 其他说明:文章提供了大量实战经验和代码片段,有助于读者快速掌握关键技术点,并避免常见错误。同时,文中提到的一些优化措施和调试技巧对提升系统性能非常有帮助。
计算机网络课程的结课设计是使用思科模拟器搭建一个中小型校园网,当时花了几天时间查阅相关博客总算是做出来了,现在免费上传CSDN,希望小伙伴们能给博客一套三连支持
《芋道开发指南文档-2023-10-27更新》是针对软件开发者和IT专业人士的一份详尽的资源集合,旨在提供最新的开发实践、范例代码和最佳策略。这份2023年10月27日更新的文档集,包含了丰富的模板和素材,帮助开发者在日常工作中提高效率,保证项目的顺利进行。 让我们深入探讨这份文档的可能内容。"芋道"可能是一个开源项目或一个专业的技术社区,其开发指南涵盖了多个方面,例如: 1. **编程语言指南**:可能包括Java、Python、JavaScript、C++等主流语言的编码规范、最佳实践以及常见问题的解决方案。 2. **框架与库的应用**:可能会讲解React、Vue、Angular等前端框架,以及Django、Spring Boot等后端框架的使用技巧和常见应用场景。 3. **数据库管理**:涵盖了SQL语言的基本操作,数据库设计原则,以及如何高效使用MySQL、PostgreSQL、MongoDB等数据库系统。 4. **版本控制**:详细介绍了Git的工作流程,分支管理策略,以及与其他开发工具(如Visual Studio Code、IntelliJ IDEA)的集成。 5. **持续集成与持续部署(CI/CD)**:包括Jenkins、Travis CI、GitHub Actions等工具的配置和使用,以实现自动化测试和部署。 6. **云服务与容器化**:可能涉及AWS、Azure、Google Cloud Platform等云计算平台的使用,以及Docker和Kubernetes的容器化部署实践。 7. **API设计与测试**:讲解RESTful API的设计原则,Swagger的使用,以及Postman等工具进行API测试的方法。 8. **安全性与隐私保护**:涵盖OAuth、JWT认证机制,HTTPS安全通信,以及防止SQL注入、
内容概要:本文介绍了一种先进的综合能源系统优化调度模型,该模型将风电、光伏、光热发电等新能源与燃气轮机、燃气锅炉等传统能源设备相结合,利用信息间隙决策(IGDT)处理不确定性。模型中引入了P2G(电转气)装置和碳捕集技术,实现了碳经济闭环。通过多能转换和储能系统的协同调度,提高了系统的灵活性和鲁棒性。文中详细介绍了模型的关键组件和技术实现,包括IGDT的鲁棒性参数设置、P2G与碳捕集的协同控制、储能系统的三维协同调度等。此外,模型展示了在极端天气和负荷波动下的优异表现,显著降低了碳排放成本并提高了能源利用效率。 适合人群:从事能源系统优化、电力调度、碳交易等相关领域的研究人员和工程师。 使用场景及目标:适用于需要处理多种能源形式和不确定性的综合能源系统调度场景。主要目标是提高系统的灵活性、鲁棒性和经济效益,减少碳排放。 其他说明:模型具有良好的扩展性,可以通过修改配置文件轻松集成新的能源设备。代码中包含了详细的注释和公式推导,便于理解和进一步改进。
毕业设计的论文撰写、终期答辩相关的资源
该是一个在 Kaggle 上发布的数据集,专注于 2024 年出现的漏洞(CVE)信息。以下是关于该数据集的详细介绍:该数据集收集了 2024 年记录在案的各类漏洞信息,涵盖了漏洞的利用方式(Exploits)、通用漏洞评分系统(CVSS)评分以及受影响的操作系统(OS)。通过整合这些信息,研究人员和安全专家可以全面了解每个漏洞的潜在威胁、影响范围以及可能的攻击途径。数据主要来源于权威的漏洞信息平台,如美国国家漏洞数据库(NVD)等。这些数据经过整理和筛选后被纳入数据集,确保了信息的准确性和可靠性。数据集特点:全面性:涵盖了多种操作系统(如 Windows、Linux、Android 等)的漏洞信息,反映了不同平台的安全状况。实用性:CVSS 评分提供了漏洞严重程度的量化指标,帮助用户快速评估漏洞的优先级。同时,漏洞利用信息(Exploits)为安全研究人员提供了攻击者可能的攻击手段,有助于提前制定防御策略。时效性:专注于 2024 年的漏洞数据,反映了当前网络安全领域面临的新挑战和新趋势。该数据集可用于多种研究和实践场景: 安全研究:研究人员可以利用该数据集分析漏洞的分布规律、攻击趋势以及不同操作系统之间的安全差异,为网络安全防护提供理论支持。 机器学习与数据分析:数据集中的结构化信息适合用于机器学习模型的训练,例如预测漏洞的 CVSS 评分、识别潜在的高危漏洞等。 企业安全评估:企业安全团队可以参考该数据集中的漏洞信息,结合自身系统的实际情况,进行安全评估和漏洞修复计划的制定。
内容概要:本文档作为建模大赛的入门指南,详细介绍了建模大赛的概念、类型、竞赛流程、核心步骤与技巧,并提供实战案例解析。文档首先概述了建模大赛,指出其以数学、计算机技术为核心,主要分为数学建模、3D建模和AI大模型竞赛三类。接着深入解析了数学建模竞赛,涵盖组队策略(如三人分别负责建模、编程、论文写作)、时间安排(72小时内完成全流程)以及问题分析、模型建立、编程实现和论文撰写的要点。文中还提供了物流路径优化的实战案例,展示了如何将实际问题转化为图论问题并采用Dijkstra或蚁群算法求解。最后,文档推荐了不同类型建模的学习资源与工具,并给出了新手避坑建议,如避免过度复杂化模型、重视可视化呈现等。; 适合人群:对建模大赛感兴趣的初学者,特别是高校学生及希望参与数学建模竞赛的新手。; 使用场景及目标:①了解建模大赛的基本概念和分类;②掌握数学建模竞赛的具体流程与分工;③学习如何将实际问题转化为数学模型并求解;④获取实战经验和常见错误规避方法。; 其他说明:文档不仅提供了理论知识,还结合具体实例和代码片段帮助读者更好地理解和实践建模过程。建议新手从中小型赛事开始积累经验,逐步提升技能水平。
该资源为protobuf-6.30.1-cp310-abi3-win32.whl,欢迎下载使用哦!
内容概要:本文档详细介绍了基于Linux系统的大数据环境搭建流程,涵盖从虚拟机创建到集群建立的全过程。首先,通过一系列步骤创建并配置虚拟机,包括设置IP地址、安装MySQL数据库等操作。接着,重点讲解了Ambari的安装与配置,涉及关闭防火墙、设置免密登录、安装时间同步服务(ntp)、HTTP服务以及配置YUM源等关键环节。最后,完成了Ambari数据库的创建、JDK的安装、Ambari server和agent的部署,并指导用户创建集群。整个过程中还提供了针对可能出现的问题及其解决方案,确保各组件顺利安装与配置。 适合人群:具有Linux基础操作技能的数据工程师或运维人员,尤其是那些需要构建和管理大数据平台的专业人士。 使用场景及目标:适用于希望快速搭建稳定可靠的大数据平台的企业或个人开发者。通过本指南可以掌握如何利用Ambari工具自动化部署Hadoop生态系统中的各个组件,从而提高工作效率,降低维护成本。 其他说明:文档中包含了大量具体的命令行指令和配置细节,建议读者按照顺序逐步操作,并注意记录下重要的参数值以便后续参考。此外,在遇到问题时可参照提供的解决方案进行排查,必要时查阅官方文档获取更多信息。
内容概要:本文详细介绍了如何在MATLAB R2018A中使用最小均方(LMS)自适应滤波算法对一维时间序列信号进行降噪处理,特别是针对心电图(ECG)信号的应用。首先,通过生成模拟的ECG信号并加入随机噪声,创建了一个带有噪声的时间序列。然后,实现了LMS算法的核心部分,包括滤波器阶数、步长参数的选择以及权重更新规则的设计。文中还提供了详细的代码示例,展示了如何构建和训练自适应滤波器,并通过图形化方式比较了原始信号、加噪信号与经过LMS处理后的降噪信号之间的差异。此外,作者分享了一些实用的经验和技术要点,如参数选择的影响、误差曲线的解读等。 适用人群:适用于具有一定MATLAB编程基础并对信号处理感兴趣的科研人员、工程师或学生。 使用场景及目标:本教程旨在帮助读者掌握LMS算法的基本原理及其在实际项目中的应用方法,特别是在生物医学工程、机械故障诊断等领域中处理含噪信号的任务。同时,也为进一步探索其他类型的自适应滤波技术和扩展到不同的信号处理任务奠定了基础。 其他说明:尽管LMS算法在处理平稳噪声方面表现出色,但在面对突发性的强干扰时仍存在一定局限性。因此,在某些特殊场合下,可能需要与其他滤波技术相结合以获得更好的效果。
内容概要:本文详细介绍了基于TMS320F2812 DSP芯片的光伏并网逆变器设计方案,涵盖了主电路架构、控制算法、锁相环实现、环流抑制等多个关键技术点。首先,文中阐述了双级式结构的主电路设计,前级Boost升压将光伏板输出电压提升至约600V,后级采用三电平NPC拓扑的IGBT桥进行逆变。接着,深入探讨了核心控制算法,如电流PI调节器、锁相环(SOFGI)、环流抑制等,并提供了详细的MATLAB仿真模型和DSP代码实现。此外,还特别强调了PWM死区时间配置、ADC采样时序等问题的实际解决方案。最终,通过实验验证,该方案实现了THD小于3%,MPPT效率达98.7%,并有效降低了并联环流。 适合人群:从事光伏并网逆变器开发的电力电子工程师和技术研究人员。 使用场景及目标:适用于光伏并网逆变器的研发阶段,帮助工程师理解和实现高效稳定的逆变器控制系统,提高系统的性能指标,减少开发过程中常见的错误。 其他说明:文中提供的MATLAB仿真模型和DSP代码可以作为实际项目开发的重要参考资料,有助于缩短开发周期,提高成功率。
内容概要:本文详细解析了三菱FX3U PLC在六轴自动包装机中的应用,涵盖硬件配置、程序框架、伺服定位控制、手自动切换逻辑、功能块应用以及报警处理等方面。硬件方面,采用FX3U-48MT主模块自带三轴脉冲输出,配合三个FX3UG-1PG模块扩展定位功能,使用六个MR-JE-20A伺服驱动器和16点输入扩展模块进行传感器采集。程序框架主要由初始化、模式切换、六轴控制和异常处理组成。伺服定位使用DRVA指令实现双速定位模式,手自动切换逻辑通过功能块封装,确保模式切换顺畅。报警处理模块则利用矩阵扫描方式压缩报警信号,提高IO利用率。此外,程序还包括状态监控设计和原点回归等功能。 适合人群:具备一定PLC编程基础,从事自动化控制领域的工程师和技术人员。 使用场景及目标:适用于六轴自动包装机的设计与调试,帮助工程师理解和掌握三菱FX3U PLC在包装机械中的具体应用,提升系统的可靠性和效率。 其他说明:文中提供了详细的代码示例和注意事项,有助于新手避免常见错误并优化程序性能。
PPTJAVA编程190