- 浏览: 611156 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
月光杯:
问题解决了吗?
Exceptions in HDFS -
iostreamin:
神,好厉害,这是我找到的唯一可以ac的Java代码,厉害。
[leetcode] word ladder II -
standalone:
One answer I agree with:引用Whene ...
How many string objects are created? -
DiaoCow:
不错!,一开始对这些确实容易犯迷糊
erlang中的冒号 分号 和 句号 -
standalone:
Exception in thread "main& ...
one java interview question
code version: hadoop-0.19.1
首先说pread。pread会明确的把要读的size传给datanode(在new BlockReader的时候)
/** * Read bytes starting from the specified position. * * @param position start read from this position * @param buffer read buffer * @param offset offset into buffer * @param length number of bytes to read * * @return actual number of bytes read */ @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { // sanity checks checkOpen(); if (closed) { throw new IOException("Stream closed"); } long filelen = getFileLength(); if ((position < 0) || (position >= filelen)) { return -1; } int realLen = length; if ((position + length) > filelen) { realLen = (int)(filelen - position); } // determine the block and byte range within the block // corresponding to position and realLen List<LocatedBlock> blockRange = getBlockRange(position, realLen); int remaining = realLen; for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset); remaining -= bytesToRead; position += bytesToRead; offset += bytesToRead; } assert remaining == 0 : "Wrong number of bytes read."; if (stats != null) { stats.incrementBytesRead(realLen); } return realLen; }
private void fetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset) throws IOException { // // Connect to best DataNode for desired Block, with potential offset // Socket dn = null; int numAttempts = block.getLocations().length; IOException ioe = null; while (dn == null && numAttempts-- > 0 ) { long prepareRealReadStart = System.currentTimeMillis(); DNAddrPair retval = chooseDataNode(block); DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; BlockReader reader = null; try { dn = socketFactory.createSocket(); NetUtils.connect(dn, targetAddr, socketTimeout); dn.setSoTimeout(socketTimeout); int len = (int) (end - start + 1); reader = BlockReader.newBlockReader(dn, src, block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(), start, len, buffersize, verifyChecksum, clientName); int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + "excpected " + len + ", got " + nread); } return; } catch (ChecksumException e) { ioe = e; LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + src + " at " + block.getBlock() + ":" + e.getPos() + " from " + chosenNode.getName()); reportChecksumFailure(src, block.getBlock(), chosenNode); } catch (IOException e) { ioe = e; LOG.warn("Failed to connect to " + targetAddr + " for file " + src + " for block " + block.getBlock().getBlockId() + ":" + StringUtils.stringifyException(e)); } finally { IOUtils.closeStream(reader); IOUtils.closeSocket(dn); dn = null; } // Put chosen node into dead list, continue addToDeadNodes(chosenNode); } throw (ioe == null) ? new IOException("Could not read data") : ioe; }
pread 的过程:
根据要读的数据的offset和readLen,计算出要读的blockRange,即有哪些block在要读的范围内。具体的getBlockRange()这个函数中要判断要读的blocks是否在维护的locatedBlocks这个block cache中,如果不在,要问namenode查询,然后再放入到cache中。
然后在针对获得的blockRange中每个block读取数据,选取datanode,创建连接,对每个block都要重新生成一个BlockReader(这种实现比较废柴啊!)
然后看seek+read,read会把当前位置到block结束的长度传给datanode (也是在new BlockReader的时候)这样DataNode就可以read ahead,然后由于TCP_WINDOW的buffer作用(hadoop code里面是128K),可以加快连续读的性能。
/** * Seek to a new arbitrary location */ @Override public synchronized void seek(long targetPos) throws IOException { if (targetPos > getFileLength()) { throw new IOException("Cannot seek after EOF"); } boolean done = false; if (pos <= targetPos && targetPos <= blockEnd) { // // If this seek is to a positive position in the current // block, and this piece of data might already be lying in // the TCP buffer, then just eat up the intervening data. // int diff = (int)(targetPos - pos); if (diff <= TCP_WINDOW_SIZE) { try { pos += blockReader.skip(diff); if (pos == targetPos) { done = true; } } catch (IOException e) {//make following read to retry LOG.debug("Exception while seek to " + targetPos + " from " + currentBlock +" of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e)); } } } if (!done) { pos = targetPos; blockEnd = -1; } }
这个 seek其实是不做什么事情的(我的测试中做上万次seek,平均时间是0)。它主要移动pos这个游标:如果在当前block中,就移动到正确位置,否则,就把pos设成目标位置,但是blockEnd置成-1.这样其实最终的seek任务是在后面的read里面实现的。
看read的code:
/** * Read the entire buffer. */ @Override public synchronized int read(byte buf[], int off, int len) throws IOException { checkOpen(); if (closed) { throw new IOException("Stream closed"); } if (pos < getFileLength()) { int retries = 2; while (retries > 0) { try { if (pos > blockEnd) { currentNode = blockSeekTo(pos); } int realLen = Math.min(len, (int) (blockEnd - pos + 1)); int result = readBuffer(buf, off, realLen); if (result >= 0) { pos += result; } else { // got a EOS from reader though we expect more data on it. throw new IOException("Unexpected EOS from the reader"); } if (stats != null && result != -1) { stats.incrementBytesRead(result); } return result; } catch (ChecksumException ce) { throw ce; } catch (IOException e) { if (retries == 1) { LOG.warn("DFS Read: " + StringUtils.stringifyException(e)); } blockEnd = -1; if (currentNode != null) { addToDeadNodes(currentNode); } if (--retries == 0) { throw e; } } } } return -1; }
如果上面seek的时候要seek的位置在同一个block,现在就只需直接读好了。如果不在,刚才知道blockEnd会被置成-1,现在就要做一次真正的seek操作,函数blockSeekTo()实现这一功能。
看blockSeekTo()干了哪些事情:
/** * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. */ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } if ( blockReader != null ) { blockReader.close(); blockReader = null; } if (s != null) { s.close(); s = null; } // // Compute desired block // LocatedBlock targetBlock = getBlockAt(target); assert (target==this.pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset(); // // Connect to best DataNode for desired Block, with potential offset // DatanodeInfo chosenNode = null; while (s == null) { DNAddrPair retval = chooseDataNode(targetBlock); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; try { s = socketFactory.createSocket(); NetUtils.connect(s, targetAddr, socketTimeout); s.setSoTimeout(socketTimeout); Block blk = targetBlock.getBlock(); blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), blk.getGenerationStamp(), offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, clientName); return chosenNode; } catch (IOException ex) { // Put chosen node into dead list, continue LOG.debug("Failed to connect to " + targetAddr + ":" + StringUtils.stringifyException(ex)); addToDeadNodes(chosenNode); if (s != null) { try { s.close(); } catch (IOException iex) { } } s = null; } } return chosenNode; }
发现这个函数跟上面的fetchBlockByteRange一样废柴,先把先前的blockReader close掉,然后再创建到目的datanode的新连接,主要不同的地方在哪里呢,在于getBlockRange和getBlockAt的区别,上面的pread模式提供了第二个参数readLen,找目标blocks的时候找这个范围内的就可以了,而seek+read这种模式假设的是seek的时候并不知道后面要读的长度,所以用了一个缺省的prefetchSize,缺省是10个block size大小。
那综合考虑两种read,作为random read的实现,第一种pread无论何时都要重新创建连接,第二种当要读的数据在当前block的时候可以重用上次的连接,理论上应该第二种效率高些。
发表评论
-
hadoop-2.2.0 build failure due to missing dependancy
2014-01-06 13:18 747The bug and fix is at https://i ... -
HDFS中租约管理源代码分析
2013-07-05 18:05 0HDFS中Client写文件的时候要获得一个租约,用来保证Cl ... -
Question on HBase source code
2013-05-22 15:05 1100I'm reading source code of hbas ... -
Using the libjars option with Hadoop
2013-05-20 15:03 965As I have said in my last post, ... -
What's Xen?
2012-12-23 17:19 1122Xen的介绍。 -
学习hadoop之基于protocol buffers的 RPC
2012-11-15 23:23 10102现在版本的hadoop各种serv ... -
学习hadoop之基于protocol buffers的 RPC
2012-11-15 22:59 2现在版本的hadoop各种server、client RPC端 ... -
Hadoop RPC 一问
2012-11-14 14:43 121看代码时候发现好像有个地方做得多余,不知道改一下会不会有好处, ... -
Hadoop Version Graph
2012-11-14 11:47 922可以到这里看全文: http://cloudblog.8km ... -
Hadoop 2.0 代码分析---MapReduce
2012-10-25 18:27 7089本文参考hadoop的版本: hadoop-2.0.1-alp ... -
how to study hadoop?
2012-04-27 15:34 1525From StackOverflow http://stack ... -
首相发怒记之hadoop篇
2012-03-23 12:14 794我在youtube上看到的,某位能翻*墙的看一下吧,挺好笑的。 ... -
Cloud Security?
2011-09-02 14:23 842看了一些文章,主要是保证用户怎么保证存储在公有云的数据的完整性 ... -
一个HDFS Error
2011-06-11 21:53 1529ERROR: hdfs.DFSClient: Excep ... -
hadoop cluster at ebay
2011-06-11 21:39 1153Friday, December 17, 2010Hadoop ... -
[转]hadoop at ebay
2011-06-11 21:09 1192http://www.ebaytechblog.com/201 ... -
【读书笔记】Data warehousing and analytics infrastructure at facebook
2011-03-18 22:03 1947这好像是sigmod2010上的paper。 读了之后做了以 ... -
cassandra example
2011-01-19 16:39 1752http://www.rackspacecloud.com/b ... -
想了解Thrift,留个记号
2011-01-19 16:35 144Thrift: Scalable Cross-Langu ... -
impact of total region numbers?
2011-01-19 16:31 925这几天tune了hbase的几个参数,有些有意思的结果。具体看 ...
相关推荐
Shell命令和java两种方式完成了常用的HDFS操作,有源代码及运行结果截图 (1)向HDFS中上传任意文本文件,如果指定的文件在HDFS中已经存在,则由用户来指定是追加到原有文件末尾还是覆盖原有的文件 (2)从HDFS中...
HDFS(Hadoop Distributed File System)是一种分布式文件系统,提供了对大规模数据的存储和管理能力。在HDFS中,基本命令是最基础也是最常用的命令,掌握这些命令是使用HDFS的基础。本节我们将详细介绍HDFS中的基本...
HDFS(Hadoop Distributed File System)是一种分布式文件系统,用于存储和管理大规模数据。它是 Hadoop 云计算平台的核心组件之一,提供了高效、可靠、可扩展的数据存储和管理解决方案。 HDFS 的优点包括: 1. 高...
在分布式文件系统中,HDFS(Hadoop Distributed File System)扮演着核心角色,而HDFS的源码分析则是深入了解HDFS架构和实现机理的关键。本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、...
HDFS是Hadoop分布式计算的存储基础。HDFS具有高容错性,可以部署在通用硬件设备上,适合数据密集型应用,并且提供对数据读写的高吞 吐量。HDFS能 够提供对数据的可扩展访问,通过简单地往集群里添加节点就可以解决...
hdfs文件的查看 hdfs fs -cat /文件名
Hadoop 分布式文件系统 (HDFS)是一个设计为用在普通硬件设备上的分布式文件系统。它与现有的分布式文件系统有很多近似的地方,但又和这些文件系统有很明显的不同。HDFS是高容错的,设计为部署在廉价硬件上的。HDFS对...
在Hadoop生态系统中,HDFS(Hadoop Distributed File System)是核心组件之一,它提供了可靠的、可伸缩的分布式存储。在与HDFS交互时,无论是上传文件、创建文件夹,还是从HDFS中下载数据,都需要依赖特定的Java库...
在Hadoop集群中,为了使HDFS Explorer能够通过WebHDFS接口访问HDFS,需要在Hadoop的配置文件`hdfs-site.xml`中启用WebHDFS服务。为此,你需要添加以下配置: ```xml <name>dfs.webhdfs.enabled <value>true ...
在这个“HDFS实例基本操作”中,我们将深入探讨如何在已经安装好的HDFS环境中执行基本操作,包括文件上传、下载以及创建文件夹。 一、HDFS的基本架构 HDFS基于主从结构,主要由NameNode和DataNode组成。NameNode...
我们将对HDFS和KFS在架构、设计原则、性能等方面进行深入的比较。 1. **架构设计** - HDFS采用了主从结构(Master-Slave),由一个NameNode作为主节点负责元数据管理,多个DataNode作为从节点存储数据块。这种设计...
HDFS(Hadoop Distributed File System)是 Hadoop 项目中的一部分,是一个分布式文件系统。HDFS Java API 是一组 Java 类库,提供了一组接口来操作 HDFS。下面我们将对 HDFS Java API 进行详细的介绍。 HDFS Java ...
HDFS 中 NameNode 节点的配置、备份和恢复 HDFS(Hadoop Distributed File System)是 Hadoop 生态系统中的分布式文件系统,它提供了高效、可靠、可扩展的文件存储解决方案。 NameNode 是 HDFS 集群中的中心服务器...
【HDFS 透明加密KMS】是Hadoop分布式文件系统(HDFS)提供的一种安全特性,用于保护存储在HDFS中的数据,确保数据在传输和存储时的安全性。HDFS透明加密通过端到端的方式实现了数据的加密和解密,无需修改用户的应用...
标题 "利用JAVA代码将本地文件传入HDFS中" 涉及到的是在Java编程环境中,使用Hadoop Distributed File System (HDFS) API来上传本地文件系统的文件到HDFS的过程。HDFS是Hadoop的核心组件之一,它提供了一个分布式、...
实验二:“熟悉常用的HDFS操作”旨在帮助学习者深入理解Hadoop分布式文件系统(HDFS)在大数据处理中的核心地位,以及如何通过Shell命令和Java API进行高效操作。HDFS在Hadoop架构中扮演着存储大数据的核心角色,为...
在本文中,我们将探讨一个具体的NIFI应用场景,即如何将从MySQL数据库中查询得到的JSON数据转换成TXT格式,并存储到HDFS(Hadoop分布式文件系统)中。这个场景在大数据处理和分析中非常常见,因为MySQL是常用的关系...