- 浏览: 535109 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
飞天奔月:
public List<String> gener ...
实践中的重构30_不做油漆匠 -
在世界的中心呼喚愛:
在世界的中心呼喚愛 写道public class A {
...
深入理解ReferenceQueue GC finalize Reference -
在世界的中心呼喚愛:
在世界的中心呼喚愛 写道在世界的中心呼喚愛 写道在classB ...
深入理解ReferenceQueue GC finalize Reference -
在世界的中心呼喚愛:
在世界的中心呼喚愛 写道在classB的finalize上打断 ...
深入理解ReferenceQueue GC finalize Reference -
在世界的中心呼喚愛:
iteye比较少上,如果可以的话,可以发e-mail交流:ch ...
深入理解ReferenceQueue GC finalize Reference
一次hadoop的read
getFileSystem
代码
Configuration
Configuration基本就是一个空对象。添加了2个配置文件到资源列表。
第一次通过Configuration获取param时才触发资源加载解析。
文件系统的cache
由URI uri, Configuration conf作为key,对FileSystem做了缓存。
初始化文件系统
由config中的fs.hdfs.impl得到文件系统的实现类。这里就是org.apache.hadoop.hdfs.DistributedFileSystem。初始化DistributedFileSystem,这样DistributedFileSystem就可以和namenode通信了。
Read file content
代码
解析path
文件的path为
hdfs://192.168.81.130:9001/user/allen/input4wordcount/test_text_01.txt
解析后为
Scheme hdfs
Authority 192.168.81.130:9001
Path /user/allen/input4wordcount/test_text_01.txt
打开FSDataInputStream
联系namenode取到block信息,注意这里是一个范围查询。查询结果缓存起来。
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
prefetchSize = 671088640
在cache中查找block
注意这里的Comparator有一个特殊处理。为了fake key可以和待查找的LocatedBlock相等。
如果cache不命中则重新查询namenode
更新原有cache
选择datanode
当和datanode建立连接时,如果出错。则3秒后(程序hard code)联系namenode重新获取datanode的信息。当重试超过一定次数时,则报错。
建立连接,读取内容
注意这里有一个简单的文件协议。
getFileSystem
代码
public static FileSystem getFileSystem() throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get( URI.create("hdfs://192.168.81.130:9001"), conf); return fs; }
Configuration
Configuration基本就是一个空对象。添加了2个配置文件到资源列表。
addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml");
第一次通过Configuration获取param时才触发资源加载解析。
文件系统的cache
static class Cache { private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>(); FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); FileSystem fs = null; synchronized (this) { fs = map.get(key); } if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { // refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !clientFinalizer.isAlive()) { Runtime.getRuntime().addShutdownHook(clientFinalizer); } fs.key = key; map.put(key, fs); return fs; } }
由URI uri, Configuration conf作为key,对FileSystem做了缓存。
初始化文件系统
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); LOG.debug("Creating filesystem for " + uri); if (clazz == null) { throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }
由config中的fs.hdfs.impl得到文件系统的实现类。这里就是org.apache.hadoop.hdfs.DistributedFileSystem。初始化DistributedFileSystem,这样DistributedFileSystem就可以和namenode通信了。
Read file content
代码
/** * linux cat file. * */ public static void readFile(String path) throws Exception { System.out.println("--------------------------------------"); System.out.println("reading file on path = " + path); FileSystem fs = Common.getFileSystem(); InputStream in = null; try { in = fs.open(new Path(path)); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } System.out.println("--------------------------------------"); }
解析path
文件的path为
hdfs://192.168.81.130:9001/user/allen/input4wordcount/test_text_01.txt
解析后为
Scheme hdfs
Authority 192.168.81.130:9001
Path /user/allen/input4wordcount/test_text_01.txt
打开FSDataInputStream
联系namenode取到block信息,注意这里是一个范围查询。查询结果缓存起来。
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
prefetchSize = 671088640
在cache中查找block
public int findBlock(long offset) { // create fake block of size 1 as a key LocatedBlock key = new LocatedBlock(); key.setStartOffset(offset); key.getBlock().setNumBytes(1); Comparator<LocatedBlock> comp = new Comparator<LocatedBlock>() { // Returns 0 iff a is inside b or b is inside a public int compare(LocatedBlock a, LocatedBlock b) { long aBeg = a.getStartOffset(); long bBeg = b.getStartOffset(); long aEnd = aBeg + a.getBlockSize(); long bEnd = bBeg + b.getBlockSize(); if(aBeg <= bBeg && bEnd <= aEnd || bBeg <= aBeg && aEnd <= bEnd) return 0; // one of the blocks is inside the other if(aBeg < bBeg) return -1; // a's left bound is to the left of the b's return 1; } }; return Collections.binarySearch(blocks, key, comp); }
注意这里的Comparator有一个特殊处理。为了fake key可以和待查找的LocatedBlock相等。
如果cache不命中则重新查询namenode
int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); // fetch more blocks LocatedBlocks newBlocks; newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize); assert (newBlocks != null) : "Could not find target position " + offset; locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); }
更新原有cache
public void insertRange(int blockIdx, List<LocatedBlock> newBlocks) { int oldIdx = blockIdx; int insStart = 0, insEnd = 0; for(int newIdx = 0; newIdx < newBlocks.size() && oldIdx < blocks.size(); newIdx++) { long newOff = newBlocks.get(newIdx).getStartOffset(); long oldOff = blocks.get(oldIdx).getStartOffset(); if(newOff < oldOff) { insEnd++; } else if(newOff == oldOff) { // replace old cached block by the new one blocks.set(oldIdx, newBlocks.get(newIdx)); if(insStart < insEnd) { // insert new blocks blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd)); oldIdx += insEnd - insStart; } insStart = insEnd = newIdx+1; oldIdx++; } else { // newOff > oldOff assert false : "List of LocatedBlock must be sorted by startOffset"; } } insEnd = newBlocks.size(); if(insStart < insEnd) { // insert new blocks blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd)); } }
选择datanode
/** * Pick the best node from which to stream the data. * Entries in <i>nodes</i> are already in the priority order */ private DatanodeInfo bestNode(DatanodeInfo nodes[], AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes) throws IOException { if (nodes != null) { for (int i = 0; i < nodes.length; i++) { if (!deadNodes.containsKey(nodes[i])) { return nodes[i]; } } } throw new IOException("No live nodes contain current block"); }
当和datanode建立连接时,如果出错。则3秒后(程序hard code)联系namenode重新获取datanode的信息。当重试超过一定次数时,则报错。
建立连接,读取内容
注意这里有一个简单的文件协议。
发表评论
-
hbase分页功能的几种实现方案
2015-01-13 23:52 5453hbase分页功能的几种实现方案。 分页功能是线上系统的常用 ... -
simplehbase v0.98.1开始支持hbase0.98
2014-12-29 21:48 1077https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v1.0简介
2014-12-13 18:55 1400https://github.com/zhang-xzhi/s ... -
hbase put UML图
2014-12-11 23:40 1301create Htable put hbase rpc ... -
hbase开发问题-PooledHTable多次close导致问题
2014-12-11 23:27 2065PooledHTable多次close导致问题 Pooled ... -
hbase的CoprocessorProtocol及一个简单的通用扩展实现V2
2014-12-04 18:00 2110hbase中的CoprocessorProtocol机制. ... -
hbase 0.94.0 0.94.9 0.94.24 功能不兼容初步分析
2014-12-04 16:10 1115hbase 0.94.0 0.94.9 0.94.24 功能不 ... -
simplehbase对JOPO新增xml配置和无配置方式
2014-10-24 22:50 994simplehbase介绍文章如下: https://gith ... -
hbase轻量级中间件simplehbase v0.9简介
2014-07-14 13:57 638https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v0.8简介
2014-04-28 21:44 3769https://github.com/zhang-xzhi/s ... -
hbase开发问题-hbase版本号报错
2014-04-22 19:19 2770由于使用了自定义的classloader,导致报错。 p ... -
HBase Client使用注意点
2014-04-21 12:51 2554HBase Client使用注意点: 1 HTable线程 ... -
hbase开发问题-hbase-0.94.0的ServerCallable callTimeout处理有问题
2014-04-14 22:07 2240读hbase-0.94.0的ServerCallable时,发 ... -
Phoenix和simplehbase功能简单比较
2014-04-02 17:20 1631Phoenix和simplehbase功能简单比较 大数据应 ... -
hbase web console simplehbaseviewer
2014-03-12 19:11 1249https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v0.2简介
2013-12-19 23:51 1693https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v0.1简介
2013-10-09 19:29 1557simplehbase尝试简化基于hbase的java应用开发 ... -
hbase的CoprocessorProtocol及一个简单的通用扩展实现V1
2013-08-18 14:15 5304hbase的CoprocessorProtocol及一个简单的 ... -
hbase的基本操作
2013-08-18 14:02 4486本文列举一些hbase的基本操作代码。 package ... -
hadoop_hadoop的map reduce
2011-11-09 21:21 1254这个根据功能模块分为 ...
相关推荐
标题 "hadoop-streaming-2.8.0_jar_2.8.0_hadoop_streaming_" 暗示我们正在讨论的是 Hadoop Streaming 的一个版本,具体是2.8.0。Hadoop Streaming 是一个 Hadoop 组件,允许用户使用可执行的脚本(如 Python 或 ...
Hadoop是Apache软件基金会开发的一个开源框架,主要设计用于处理和存储海量数据。它采用了分布式计算模型,使得在大规模集群上处理数据变得高效且可靠。HDFS(Hadoop Distributed File System)是Hadoop的核心组件之...
标题 "hadoop_hadoop-2.7.2-hbase-jar.rar" 提供的信息表明,这是一个与Hadoop相关的压缩文件,具体来说是Hadoop 2.7.2版本的HBase JAR文件。Hadoop是一个开源框架,主要用于分布式存储和处理大数据。而HBase是建立...
Hadoop分布式文件系统(HDFS)是Apache Hadoop项目的核心组件之一,它为大规模数据处理提供了可扩展、高容错性的存储解决方案。本资料集围绕“hdfs_design.rar”这个压缩包,详细介绍了HDFS的原理、操作以及在Java...
当一个节点失败时,Hadoop能够自动重定向到其他节点读取数据,保证服务的连续性。 在MapReduce初级案例中,通常会涵盖如何编写Map和Reduce函数,如何配置和运行Hadoop作业,以及如何解析和理解输出结果。开发者通常...
随着大数据处理需求的日益增长,Hadoop生态中的HBase因其卓越的数据处理能力和灵活性,成为了众多企业的大数据解决方案之一。本文旨在帮助初次接触HBase的业务开发与测试人员快速理解HBase的基本概念和技术要点,...
HDFS是Hadoop的核心组件之一,它是一种分布式文件系统,能够将大量数据存储在由多台普通服务器组成的集群上。通过Java API,我们可以实现对HDFS的基本操作,如文件的创建、读取、写入和删除。在`HadoopDemo`项目中,...
2. **读取文件**:使用`FileSystem`实例的`open(Path path)`方法打开文件,返回一个`FSDataInputStream`,然后可以读取数据。 3. **写入文件**:通过`create(Path path, boolean overwrite)`创建新文件或覆盖已存在...
在分布式计算领域,Hadoop是一个不可或缺的名字,它提供了一个开源框架,用于存储和处理大量数据。HDFS(Hadoop Distributed File System)则是Hadoop的核心组件之一,负责数据的分布式存储。本篇将深入探讨Hadoop...
《Hadoop_2.X_HDFS源码剖析》是由徐鹏编著的一本深入解析Hadoop 2.x版本中HDFS(Hadoop Distributed File System)源码的专业书籍。这本书旨在帮助读者理解HDFS的核心机制,提升在分布式存储系统方面的专业技能。 ...
标题中的“MR.rar_hadoop_mapReduce_paidabk”暗示了这是一个与Hadoop MapReduce相关的压缩文件,其中可能包含了实现MapReduce算法的源代码以及与Hadoop框架相关的JAR包。这个压缩包很可能是为了帮助开发者理解和...
此外,压缩包中的“源码”部分可能包含Hadoop的源代码,这对于深入理解Hadoop的工作原理和进行二次开发非常有帮助。你可以通过阅读源码了解Hadoop内部如何处理数据分发、容错机制、数据块复制等关键功能。 总的来说...
Hadoop是Apache软件基金会的一个开源项目,它提供了一个分布式文件系统(HDFS)和一个并行计算框架(MapReduce),为大数据处理提供了强大的支持。这个名为“hadoop_note.zip”的压缩包,很可能是包含了一份关于...
在IT行业中,大数据处理是一个至关重要的领域,而Hadoop作为其中的明星框架,为企业提供了高效、可扩展的数据处理解决方案。本篇文章将详细讲解Hadoop在CentOS操作系统上的安装与配置,帮助你深入理解Hadoop集群的...
Hadoop的设计理念是“一次写入,多次读取”(WAL),这允许数据在写入后保持不变,从而提高查询效率。此外,Hadoop具有弹性,可以轻松地添加或移除节点以适应不断变化的数据量和需求。 在实际应用中,Hadoop通常用于...
它在Hadoop之上提供实时读写访问,适用于需要快速随机读取的大数据存储。HBase有强大的索引和查询能力,尤其适合实时分析。HBase的表由行、列族、列和时间戳组成,这种结构使得数据可以根据不同的维度进行快速查询。...
Hadoop是Apache软件基金会开发的一个开源项目,其核心设计思想是实现大数据的分布式存储和处理。Hadoop的主要组件包括Hadoop Distributed File System (HDFS)和MapReduce。HDFS是一种高度容错性的分布式文件系统,...
Hadoop是一个开源的分布式计算框架,它为处理和存储大量数据提供了基础架构。在Hadoop中,倒排索引可以被用来加速MapReduce任务的执行,尤其是在执行搜索或者数据分析时。Hadoop通过其HDFS(Hadoop Distributed File...
- 流式数据访问:适合一次性写入、多次读取的场景,优化了大数据读取效率。 - 块级存储:大文件被分割成固定大小的块,便于分布式存储和处理。 2. **MapReduce**:Hadoop的另一核心组件,处理海量数据的计算模型...