package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
/** An {@link RecordReader} for {@link SequenceFile}s. */
@InterfaceAudience.Public
@InterfaceStability.Stable
/**
*hadoop中一个读取文件中某个片段(由其内部的start、length决定片段)的类,继承自RecordReader,可以作为InputFomat中 public *RecordReader<K, V> getRecordReader(InputSplit split,JobConf job, Reporter reporter)方法的返回的结果,处理FileSplit 。
* K V对应map中K V,其每次取得K V,要作为map的输入来做的
**/
public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {
private SequenceFile.Reader in; //读取的Reader
private long start; //文件在Reader的开始位置
private long end; //结束位置,一个hdfs文件在Reader中是连续的?
private boolean more = true;
protected Configuration conf;
public SequenceFileRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
this.in = new SequenceFile.Reader(fs, path, conf);
this.end = split.getStart() + split.getLength();
this.conf = conf;
if (split.getStart() > in.getPosition())
in.sync(split.getStart()); // sync to start
this.start = in.getPosition(); // 一个hdfs的开始位置,但不是在Reader中的开始位置,是以取Reader的开始位置 ??
more = start < end;
}
/** The class of key that must be passed to {@link
* #next(Object, Object)}.. */
public Class getKeyClass() { return in.getKeyClass(); }
/** The class of value that must be passed to {@link
* #next(Object, Object)}.. */
public Class getValueClass() { return in.getValueClass(); }
/**
*每次调用next(K key, V value)方法需要传入相应的K 和 V 对象,是以以下像个方法构造K 、V的实例,
*以便调用next(K key, V value),传给map方法
*
*/
@SuppressWarnings("unchecked")
public K createKey() {
return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
}
@SuppressWarnings("unchecked")
public V createValue() {
return (V) ReflectionUtils.newInstance(getValueClass(), conf);
}
/**
*取值的函数
*/
public synchronized boolean next(K key, V value) throws IOException {
if (!more) return false;
long pos = in.getPosition();
boolean remaining = (in.next(key) != null);
if (remaining) {
getCurrentValue(value);
}
if (pos >= end && in.syncSeen()) {
more = false;
} else {
more = remaining;
}
return more;
}
/**
*用于跳过某个key
*/
protected synchronized boolean next(K key)
throws IOException {
if (!more) return false;
long pos = in.getPosition();
boolean remaining = (in.next(key) != null);
if (pos >= end && in.syncSeen()) {
more = false;
} else {
more = remaining;
}
return more;
}
protected synchronized void getCurrentValue(V value)
throws IOException {
in.getCurrentValue(value);
}
/**
* Return the progress within the input split
* @return 0.0 to 1.0 of the input byte range
*/
public float getProgress() throws IOException {
if (end == start) {
return 0.0f;
} else {
return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
}
}
public synchronized long getPos() throws IOException {
return in.getPosition();
}
protected synchronized void seek(long pos) throws IOException {
in.seek(pos);
}
public synchronized void close() throws IOException { in.close(); }
}
hadoop中一个读取文件中某个片段(由其内部的start、length决定片段)的类,继承自RecordReader,可以作为InputFomat中 public RecordReader<K, V> getRecordReader(InputSplit split,JobConf job, Reporter reporter)方法的返回的结果,处理FileSplit 。
可以读取FileSplit中的key、Value对,以便map可以使用 。
分享到:
相关推荐
【标题】:“Eclipse Hadoop 例子源代码” 在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储和计算的能力。Eclipse作为Java开发的主流集成开发环境(IDE),也是编写和调试Hadoop程序的重要...
本资源"hadop实战源代码Java"聚焦于利用Java编程语言与Hadoop Distributed File System (HDFS)进行交互,实现文件的上传、下载以及删除等核心功能。这里我们将深入探讨这些知识点。 首先,了解Hadoop的基本架构是...
这个【标题】"hadoop-2.0.4官方源代码"代表着Hadoop的2.0.4版本的原始代码,这是一次重要的发布,因为它引入了许多改进和新特性。【描述】"hadoop-2.0.4官方源代码"简洁明了地指出了该压缩包的内容,即Hadoop的源码...
本文将深入Hadoop的源代码,探讨其设计原理和实现机制,帮助开发者更好地理解和优化Hadoop系统。 Hadoop的源代码分析可以从以下几个关键点开始: 1. **HDFS架构**:HDFS是一个分布式文件系统,设计用于在廉价硬件...
Hadoop是Apache软件基金...总的来说,"Hadoop 归档源代码"涵盖了Hadoop如何管理和优化大量小文件的关键技术,通过深入学习这部分源代码,我们可以更全面地理解Hadoop的架构,提升在大数据环境下的系统设计和优化能力。
《Hadoop实战源代码》是针对大数据处理框架Hadoop的一份深入学习资料,涵盖了Java、Python和C++三种编程语言的应用。这份资源旨在帮助开发者理解Hadoop的核心原理,并通过源代码实现来提升实际操作能力。以下是对每...
Hadoop源代码分析完整版.pdf
很抱歉,根据您提供的信息,"8K UHD"视频文件与"Hadoop历史源代码归档"的主题并不相符。Hadoop是一个开源的分布式计算框架,主要用于处理和存储大量数据,而您提供的两个MP4文件是视频内容,通常与编程或IT技术无关...
深入云计算:Hadoop源代码分析(修订版)
包mapreduce.lib.map的Hadoop源代码分析
《Hadoop源代码分析》是一本深入探讨Hadoop核心组件MapReduce的专著。Hadoop是Apache软件基金会的一个开源项目,旨在提供分布式存储和计算框架,以处理和存储大量数据。MapReduce是Hadoop的核心计算模型,它通过将大...
Hadoop 源代码分析 Hadoop 是一个开源的分布式计算框架,由 Apache 基金会维护。Hadoop 的核心组件包括 HDFS(Hadoop Distributed File System)和 MapReduce。HDFS 是一个分布式文件系统,可以存储大量的数据,而 ...
《Hadoop实战源代码》(HadoopinAction_source_code)是针对大数据处理框架Hadoop的一份珍贵资源,其中包含了从知名书籍《Hadoop in Action》官网获取的实际代码示例。这些示例涵盖了Hadoop的核心组件及其应用,为...
### 深入云计算 Hadoop源代码分析 #### 一、引言 随着大数据时代的到来,数据处理成为了各个领域中的关键技术之一。Hadoop作为一个开源的大数据处理框架,因其优秀的分布式计算能力,在业界得到了广泛的应用。...
总的来说,"tomwhite-hadoop-book-src"这个压缩包内的源代码是学习Hadoop理论知识与实践经验的宝贵资源。通过深入研究和实践,你可以更好地理解分布式计算的精髓,提升自己在大数据领域的技能。
### Hadoop源代码Eclipse编译教程 #### 一、下载Hadoop源代码 - **下载地址**: Apache Hadoop的源代码可以通过官方的SVN仓库获取。具体地址为:`http://svn.apache.org/repos/asf/hadoop`。 - **推荐方式**: 使用...
在标题“Hadoop 自定义 Partitioner 源代码”中,我们可以理解为讨论的是如何创建和理解 Partitioner 的源代码,以便于开发者可以更好地控制 MapReduce job 中的数据分片过程。自定义 Partitioner 可能涉及到以下...
"Hadoop源代码分析完整版" Hadoop作为一个开源的分布式计算框架,具有高可扩展性和高性能的特点。Hadoop的源代码分析可以分为多个部分,包括HDFS、MapReduce、YARN等。 首先,让我们了解Hadoop的架构。Hadoop的...
标题“进军Hadoop源代码”和描述“进军Hadoop源代码,进军Hadoop源代码,进军Hadoop源代码,进军Hadoop源代码”看似重复,实际上强调了对Hadoop源代码的重要性。Hadoop是一个开源的分布式存储与计算系统,由Apache...