`
dacoolbaby
  • 浏览: 1267183 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Hadoop LineRecordReader实现分析

阅读更多

直接上代码:

package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

/**
 * Treats keys as offset in file and value as line. 
 */
public class LineRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(LineRecordReader.class);

  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;
  private Seekable filePosition;
  private CompressionCodec codec; //
  private Decompressor decompressor;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);
	//根据job的配置信息,和split的信息,获取到读取实体文件的信息,这里包括文件的压缩信息。
	//这里压缩的code有:DEFAULT,GZIP,BZIP2,LZO,LZ4,SNAPPY
    codec = compressionCodecs.getCodec(file); 

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

    if (isCompressedInput()) {
	//通过CodecPool的getCompressor方法获得Compressor对象,该方法需要传入一个codec,
	//然后Compressor对象在createOutputStream中使用,使用完毕后再通过returnCompressor放回去
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new LineReader(cIn, job);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        in = new LineReader(codec.createInputStream(fileIn, decompressor),
            job);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      in = new LineReader(fileIn, job);
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
  
  private boolean isCompressedInput() {
    return (codec != null);
  }

  private int maxBytesToConsume(long pos) {
    return isCompressedInput()
      ? Integer.MAX_VALUE
      : (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal;
    if (isCompressedInput() && null != filePosition) {
      retVal = filePosition.getPos();
    } else {
      retVal = pos;
    }
    return retVal;
  }

//读取每一行数据的时候,都会执行nextKeyValue()方法。
//返回为true的时候,就会再调用getCurrentKey和getCurrentValue方法获取,key,value值
  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
	//在这里进行数据读取,LineReader以\n作为分隔符,读取一行数据,放到Text value里面
     //读取一行,可以参考LineReader的源码实现
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));
    }
  }

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
        in.close();
      }
    } finally {
      if (decompressor != null) {
        CodecPool.returnDecompressor(decompressor);
      }
    }
  }
}

 

ref:http://blog.csdn.net/lastsweetop/article/details/9173061

http://www.myexception.cn/program/1345730.html

 

分享到:
评论

相关推荐

    基于Hadoop的成绩分析系统.docx

    本文介绍了基于Hadoop的成绩分析系统的设计和实现,讨论了Hadoop的特点和MapReduce的应用,介绍了Hadoop集群的搭建过程和成绩分析的实现过程。该系统可以帮助高校更好地管理学生的成绩信息,提高成绩管理的效率和...

    如何使用hadoop进行数据分析.zip

    如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop...

    Hadoop源代码分析(完整版).pdf

    Hadoop 的源代码分析可以帮助开发者更好地理解 Hadoop 的架构和实现机制,从而更好地使用 Hadoop 实现大数据处理和分析。 Hadoop 的关键部分集中在图中的蓝色部分,这也是我们考察的重点。Hadoop 的包的功能分析...

    基于Hadoop数据分析系统设计(需求分析).docx

    MapReduce则是Hadoop的数据处理模型,通过"映射"和"规约"两个阶段,实现了大规模数据集的并行计算,极大地提高了处理效率。 Hive是建立在Hadoop之上的数据仓库工具,它允许用户使用SQL-like语言进行数据查询和分析...

    基于Hadoop豆瓣电影数据分析实验报告

    【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...

    AQI空气质量分析-基于Hadoop MapReduce实现源代码+分析实验报告(高分完整项目)

    AQI空气质量分析-基于Hadoop MapReduce实现源代码+分析实验报告(高分完整项目)AQI空气质量分析-基于Hadoop MapReduce实现源代码+分析实验报告(高分完整项目)AQI空气质量分析-基于Hadoop MapReduce实现源代码+...

    Hadoop应用案例分析:雅虎、eBay、百度、Facebook.pdf

    ,Hadoop 技术已经在互联网领域得到了广泛的应用。...同样也得到了许多公司的青睐,如百度主要将Hadoop 应用于日志分析和网页数据库的数据 挖掘;阿里巴巴则将Hadoop 用于商业数据的排序和搜索引擎的优化等。

    Hadoop豆瓣电影分析可视化源码

    针对本次实验,我们需要用到Hadoop集群作为模拟大数据的分析软件,集群环境必须要包括,hdfs,hbase,hive,flume,sqoop等插件,最后结合分析出来的数据进行可视化展示,需要用到Python(爬取数据集,可视化展示)...

    深入云计算 Hadoop源代码分析

    通过对Hadoop源代码的分析,我们可以更加深入地理解Hadoop是如何通过HDFS和MapReduce实现高效数据处理的。《深入云计算 Hadoop源代码分析》这本书不仅适合已经有一定Hadoop基础的读者,也适合希望深入了解Hadoop内部...

    Hadoop源代码分析完整版.pdf

    Hadoop源代码分析完整版.pdf

    基于Hadoop的电影影评数据分析

    在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取高频词语。 Hadoop的优势在于其高容错性和可扩展性,能够处理PB级别的数据。通过分布式计算,即使面对...

    使用hadoop进行天气数据分析.zip

    使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...

    深入云计算:Hadoop源代码分析(修订版)

    深入云计算:Hadoop源代码分析(修订版)

    构建企业级数仓-Hadoop可行性分析报告.docx

    **基于Hadoop平台的数据仓库可行性分析报告** **1. 引言** 在信息化时代,企业对数据处理的需求日益增长,传统的数据仓库系统由于其规模、性能和灵活性的限制,已经无法满足现代企业对大数据处理的需求。Hadoop作为...

    Hadoop数据分析_大数据_hadoop_数据分析_

    本主题将深入探讨Hadoop在数据分析中的应用及其生态系统的关键技术。 首先,我们需要理解“大数据”的概念。大数据指的是无法用传统数据库软件工具捕获、管理和处理的大规模数据集。这些数据集通常具有三个关键特征...

    Hadoop源码分析(完整版)

    Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...

    Hadoop源代码分析

    MapReduce是Hadoop的核心计算模型,它通过将大规模数据集分解为小块并并行处理,实现了高效的分布式计算。 在《Hadoop源代码分析》中,作者详细剖析了Hadoop MapReduce的工作原理,从源代码层面揭示了其内部机制。...

    Hadoop之外卖订单数据分析系统

    本文将深入探讨“Hadoop之外卖订单数据分析系统”,并介绍如何利用Hadoop进行大规模数据处理,以及如何将分析结果通过可视化手段进行展示。 首先,我们需要理解Hadoop的核心组件:HDFS(Hadoop Distributed File ...

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...

Global site tag (gtag.js) - Google Analytics