`
zhang_xzhi_xjtu
  • 浏览: 535109 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hadoop_hadoop的一次读取

 
阅读更多
一次hadoop的read
getFileSystem
代码
	public static FileSystem getFileSystem() throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(
				URI.create("hdfs://192.168.81.130:9001"), conf);
		return fs;
	}




Configuration
Configuration基本就是一个空对象。添加了2个配置文件到资源列表。
    addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");

第一次通过Configuration获取param时才触发资源加载解析。



文件系统的cache
static class Cache {
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      FileSystem fs = null;
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }
      
      fs = createFileSystem(uri, conf);
      synchronized (this) {  // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty() && !clientFinalizer.isAlive()) {
          Runtime.getRuntime().addShutdownHook(clientFinalizer);
        }
        fs.key = key;
        map.put(key, fs);
        return fs;
      }
}

由URI uri, Configuration conf作为key,对FileSystem做了缓存。



初始化文件系统
  private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    LOG.debug("Creating filesystem for " + uri);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }

由config中的fs.hdfs.impl得到文件系统的实现类。这里就是org.apache.hadoop.hdfs.DistributedFileSystem。初始化DistributedFileSystem,这样DistributedFileSystem就可以和namenode通信了。



Read file content
代码
	/**
	 * linux cat file.
	 * */
	public static void readFile(String path) throws Exception {

		System.out.println("--------------------------------------");
		System.out.println("reading file on path = " + path);

		FileSystem fs = Common.getFileSystem();

		InputStream in = null;
		try {
			in = fs.open(new Path(path));
			IOUtils.copyBytes(in, System.out, 4096, false);
		} finally {
			IOUtils.closeStream(in);
		}
		System.out.println("--------------------------------------");
	}



解析path
文件的path为
hdfs://192.168.81.130:9001/user/allen/input4wordcount/test_text_01.txt
解析后为
Scheme hdfs
Authority 192.168.81.130:9001
Path /user/allen/input4wordcount/test_text_01.txt



打开FSDataInputStream
联系namenode取到block信息,注意这里是一个范围查询。查询结果缓存起来。
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
prefetchSize = 671088640



在cache中查找block
public int findBlock(long offset) {
    // create fake block of size 1 as a key
    LocatedBlock key = new LocatedBlock();
    key.setStartOffset(offset);
    key.getBlock().setNumBytes(1);
    Comparator<LocatedBlock> comp = 
      new Comparator<LocatedBlock>() {
        // Returns 0 iff a is inside b or b is inside a
        public int compare(LocatedBlock a, LocatedBlock b) {
          long aBeg = a.getStartOffset();
          long bBeg = b.getStartOffset();
          long aEnd = aBeg + a.getBlockSize();
          long bEnd = bBeg + b.getBlockSize();
          if(aBeg <= bBeg && bEnd <= aEnd 
              || bBeg <= aBeg && aEnd <= bEnd)
            return 0; // one of the blocks is inside the other
          if(aBeg < bBeg)
            return -1; // a's left bound is to the left of the b's
          return 1;
        }
      };
    return Collections.binarySearch(blocks, key, comp);
  }

注意这里的Comparator有一个特殊处理。为了fake key可以和待查找的LocatedBlock相等。



如果cache不命中则重新查询namenode
      int targetBlockIdx = locatedBlocks.findBlock(offset);
      if (targetBlockIdx < 0) { // block is not cached
        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
        // fetch more blocks
        LocatedBlocks newBlocks;
        newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
        assert (newBlocks != null) : "Could not find target position " + offset;
        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
      }

更新原有cache
public void insertRange(int blockIdx, List<LocatedBlock> newBlocks) {
    int oldIdx = blockIdx;
    int insStart = 0, insEnd = 0;
    for(int newIdx = 0; newIdx < newBlocks.size() && oldIdx < blocks.size(); 
                                                        newIdx++) {
      long newOff = newBlocks.get(newIdx).getStartOffset();
      long oldOff = blocks.get(oldIdx).getStartOffset();
      if(newOff < oldOff) {
        insEnd++;
      } else if(newOff == oldOff) {
        // replace old cached block by the new one
        blocks.set(oldIdx, newBlocks.get(newIdx));
        if(insStart < insEnd) { // insert new blocks
          blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
          oldIdx += insEnd - insStart;
        }
        insStart = insEnd = newIdx+1;
        oldIdx++;
      } else {  // newOff > oldOff
        assert false : "List of LocatedBlock must be sorted by startOffset";
      }
    }
    insEnd = newBlocks.size();
    if(insStart < insEnd) { // insert new blocks
      blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
    }
  }



选择datanode
  /**
   * Pick the best node from which to stream the data.
   * Entries in <i>nodes</i> are already in the priority order
   */
  private DatanodeInfo bestNode(DatanodeInfo nodes[], 
                                AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
                                throws IOException {
    if (nodes != null) { 
      for (int i = 0; i < nodes.length; i++) {
        if (!deadNodes.containsKey(nodes[i])) {
          return nodes[i];
        }
      }
    }
    throw new IOException("No live nodes contain current block");
  }

当和datanode建立连接时,如果出错。则3秒后(程序hard code)联系namenode重新获取datanode的信息。当重试超过一定次数时,则报错。



建立连接,读取内容
注意这里有一个简单的文件协议。

1
1
分享到:
评论
1 楼 inuyasha027 2013-05-01  
这是一篇很好的文章,学习了,谢谢。

相关推荐

    hadoop-streaming-2.8.0_jar_2.8.0_hadoop_streaming_

    标题 "hadoop-streaming-2.8.0_jar_2.8.0_hadoop_streaming_" 暗示我们正在讨论的是 Hadoop Streaming 的一个版本,具体是2.8.0。Hadoop Streaming 是一个 Hadoop 组件,允许用户使用可执行的脚本(如 Python 或 ...

    hdfs.rar_hadoop_hadoop ubuntu_hdfs_分布式系统_基于hadoop

    Hadoop是Apache软件基金会开发的一个开源框架,主要设计用于处理和存储海量数据。它采用了分布式计算模型,使得在大规模集群上处理数据变得高效且可靠。HDFS(Hadoop Distributed File System)是Hadoop的核心组件之...

    hadoop_hadoop-2.7.2-hbase-jar.rar linux下包

    标题 "hadoop_hadoop-2.7.2-hbase-jar.rar" 提供的信息表明,这是一个与Hadoop相关的压缩文件,具体来说是Hadoop 2.7.2版本的HBase JAR文件。Hadoop是一个开源框架,主要用于分布式存储和处理大数据。而HBase是建立...

    hdfs_design.rar_HDFS-OPERATE_hadoop_hadoop java_hdfs

    Hadoop分布式文件系统(HDFS)是Apache Hadoop项目的核心组件之一,它为大规模数据处理提供了可扩展、高容错性的存储解决方案。本资料集围绕“hdfs_design.rar”这个压缩包,详细介绍了HDFS的原理、操作以及在Java...

    细细品味Hadoop_Hadoop集群(第9期)_MapReduce初级案例

    当一个节点失败时,Hadoop能够自动重定向到其他节点读取数据,保证服务的连续性。 在MapReduce初级案例中,通常会涵盖如何编写Map和Reduce函数,如何配置和运行Hadoop作业,以及如何解析和理解输出结果。开发者通常...

    细细品味Hadoop_Hadoop集群(第11期副刊)_HBase之旅.pdf

    随着大数据处理需求的日益增长,Hadoop生态中的HBase因其卓越的数据处理能力和灵活性,成为了众多企业的大数据解决方案之一。本文旨在帮助初次接触HBase的业务开发与测试人员快速理解HBase的基本概念和技术要点,...

    HadoopDemo_hadoopDemo_nationhb8_hadoop_源码

    HDFS是Hadoop的核心组件之一,它是一种分布式文件系统,能够将大量数据存储在由多台普通服务器组成的集群上。通过Java API,我们可以实现对HDFS的基本操作,如文件的创建、读取、写入和删除。在`HadoopDemo`项目中,...

    Java-API-Operate-Hadoop.rar_hadoop_hadoop api

    2. **读取文件**:使用`FileSystem`实例的`open(Path path)`方法打开文件,返回一个`FSDataInputStream`,然后可以读取数据。 3. **写入文件**:通过`create(Path path, boolean overwrite)`创建新文件或覆盖已存在...

    HDFS.zip_Hadoop 平台_hadoop_hdfs

    在分布式计算领域,Hadoop是一个不可或缺的名字,它提供了一个开源框架,用于存储和处理大量数据。HDFS(Hadoop Distributed File System)则是Hadoop的核心组件之一,负责数据的分布式存储。本篇将深入探讨Hadoop...

    Hadoop_2.X_HDFS源码剖析_带索引书签目录_徐鹏

    《Hadoop_2.X_HDFS源码剖析》是由徐鹏编著的一本深入解析Hadoop 2.x版本中HDFS(Hadoop Distributed File System)源码的专业书籍。这本书旨在帮助读者理解HDFS的核心机制,提升在分布式存储系统方面的专业技能。 ...

    MR.rar_hadoop_mapReduce_paidabk

    标题中的“MR.rar_hadoop_mapReduce_paidabk”暗示了这是一个与Hadoop MapReduce相关的压缩文件,其中可能包含了实现MapReduce算法的源代码以及与Hadoop框架相关的JAR包。这个压缩包很可能是为了帮助开发者理解和...

    Hadoop平台搭建方案_hadoop_源码.zip

    此外,压缩包中的“源码”部分可能包含Hadoop的源代码,这对于深入理解Hadoop的工作原理和进行二次开发非常有帮助。你可以通过阅读源码了解Hadoop内部如何处理数据分发、容错机制、数据块复制等关键功能。 总的来说...

    hadoop_note.zip

    Hadoop是Apache软件基金会的一个开源项目,它提供了一个分布式文件系统(HDFS)和一个并行计算框架(MapReduce),为大数据处理提供了强大的支持。这个名为“hadoop_note.zip”的压缩包,很可能是包含了一份关于...

    细细品味Hadoop_Hadoop集群CentOS安装配置.rar_hadoop

    在IT行业中,大数据处理是一个至关重要的领域,而Hadoop作为其中的明星框架,为企业提供了高效、可扩展的数据处理解决方案。本篇文章将详细讲解Hadoop在CentOS操作系统上的安装与配置,帮助你深入理解Hadoop集群的...

    第3章Hadoop础述_hadoop_

    Hadoop的设计理念是“一次写入,多次读取”(WAL),这允许数据在写入后保持不变,从而提高查询效率。此外,Hadoop具有弹性,可以轻松地添加或移除节点以适应不断变化的数据量和需求。 在实际应用中,Hadoop通常用于...

    hadoop_hbase_pig

    它在Hadoop之上提供实时读写访问,适用于需要快速随机读取的大数据存储。HBase有强大的索引和查询能力,尤其适合实时分析。HBase的表由行、列族、列和时间戳组成,这种结构使得数据可以根据不同的维度进行快速查询。...

    hadoop_definitive_guide_4th(hadoop权威指南第四版)

    Hadoop是Apache软件基金会开发的一个开源项目,其核心设计思想是实现大数据的分布式存储和处理。Hadoop的主要组件包括Hadoop Distributed File System (HDFS)和MapReduce。HDFS是一种高度容错性的分布式文件系统,...

    InvertedIndex_expectn1h_hadoop_zip_

    Hadoop是一个开源的分布式计算框架,它为处理和存储大量数据提供了基础架构。在Hadoop中,倒排索引可以被用来加速MapReduce任务的执行,尤其是在执行搜索或者数据分析时。Hadoop通过其HDFS(Hadoop Distributed File...

    Hadoop_in_Action

    - 流式数据访问:适合一次性写入、多次读取的场景,优化了大数据读取效率。 - 块级存储:大文件被分割成固定大小的块,便于分布式存储和处理。 2. **MapReduce**:Hadoop的另一核心组件,处理海量数据的计算模型...

Global site tag (gtag.js) - Google Analytics