直接上代码:
package org.apache.hadoop.mapreduce.lib.input; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.LineReader; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; /** * Treats keys as offset in file and value as line. */ public class LineRecordReader extends RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(LineRecordReader.class); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; private Seekable filePosition; private CompressionCodec codec; // private Decompressor decompressor; public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); //根据job的配置信息,和split的信息,获取到读取实体文件的信息,这里包括文件的压缩信息。 //这里压缩的code有:DEFAULT,GZIP,BZIP2,LZO,LZ4,SNAPPY codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); if (isCompressedInput()) { //通过CodecPool的getCompressor方法获得Compressor对象,该方法需要传入一个codec, //然后Compressor对象在createOutputStream中使用,使用完毕后再通过returnCompressor放回去 decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new LineReader(cIn, job); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { in = new LineReader(codec.createInputStream(fileIn, decompressor), job); filePosition = fileIn; } } else { fileIn.seek(start); in = new LineReader(fileIn, job); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } private boolean isCompressedInput() { return (codec != null); } private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput() && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } //读取每一行数据的时候,都会执行nextKeyValue()方法。 //返回为true的时候,就会再调用getCurrentKey和getCurrentValue方法获取,key,value值 public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end) { //在这里进行数据读取,LineReader以\n作为分隔符,读取一行数据,放到Text value里面 //读取一行,可以参考LineReader的源码实现 newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); } } public synchronized void close() throws IOException { try { if (in != null) { in.close(); } } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } } } }
ref:http://blog.csdn.net/lastsweetop/article/details/9173061
http://www.myexception.cn/program/1345730.html
相关推荐
本文介绍了基于Hadoop的成绩分析系统的设计和实现,讨论了Hadoop的特点和MapReduce的应用,介绍了Hadoop集群的搭建过程和成绩分析的实现过程。该系统可以帮助高校更好地管理学生的成绩信息,提高成绩管理的效率和...
如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop...
Hadoop 的源代码分析可以帮助开发者更好地理解 Hadoop 的架构和实现机制,从而更好地使用 Hadoop 实现大数据处理和分析。 Hadoop 的关键部分集中在图中的蓝色部分,这也是我们考察的重点。Hadoop 的包的功能分析...
MapReduce则是Hadoop的数据处理模型,通过"映射"和"规约"两个阶段,实现了大规模数据集的并行计算,极大地提高了处理效率。 Hive是建立在Hadoop之上的数据仓库工具,它允许用户使用SQL-like语言进行数据查询和分析...
AQI空气质量分析-基于Hadoop MapReduce实现源代码+分析实验报告(高分完整项目)AQI空气质量分析-基于Hadoop MapReduce实现源代码+分析实验报告(高分完整项目)AQI空气质量分析-基于Hadoop MapReduce实现源代码+...
【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...
,Hadoop 技术已经在互联网领域得到了广泛的应用。...同样也得到了许多公司的青睐,如百度主要将Hadoop 应用于日志分析和网页数据库的数据 挖掘;阿里巴巴则将Hadoop 用于商业数据的排序和搜索引擎的优化等。
针对本次实验,我们需要用到Hadoop集群作为模拟大数据的分析软件,集群环境必须要包括,hdfs,hbase,hive,flume,sqoop等插件,最后结合分析出来的数据进行可视化展示,需要用到Python(爬取数据集,可视化展示)...
通过对Hadoop源代码的分析,我们可以更加深入地理解Hadoop是如何通过HDFS和MapReduce实现高效数据处理的。《深入云计算 Hadoop源代码分析》这本书不仅适合已经有一定Hadoop基础的读者,也适合希望深入了解Hadoop内部...
Hadoop源代码分析完整版.pdf
在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取高频词语。 Hadoop的优势在于其高容错性和可扩展性,能够处理PB级别的数据。通过分布式计算,即使面对...
使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...
深入云计算:Hadoop源代码分析(修订版)
**基于Hadoop平台的数据仓库可行性分析报告** **1. 引言** 在信息化时代,企业对数据处理的需求日益增长,传统的数据仓库系统由于其规模、性能和灵活性的限制,已经无法满足现代企业对大数据处理的需求。Hadoop作为...
本主题将深入探讨Hadoop在数据分析中的应用及其生态系统的关键技术。 首先,我们需要理解“大数据”的概念。大数据指的是无法用传统数据库软件工具捕获、管理和处理的大规模数据集。这些数据集通常具有三个关键特征...
本文将深入探讨“Hadoop之外卖订单数据分析系统”,并介绍如何利用Hadoop进行大规模数据处理,以及如何将分析结果通过可视化手段进行展示。 首先,我们需要理解Hadoop的核心组件:HDFS(Hadoop Distributed File ...
Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...
MapReduce是Hadoop的核心计算模型,它通过将大规模数据集分解为小块并并行处理,实现了高效的分布式计算。 在《Hadoop源代码分析》中,作者详细剖析了Hadoop MapReduce的工作原理,从源代码层面揭示了其内部机制。...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...