- 浏览: 24939 次
- 性别:
- 来自: 上海
最新评论
Hadoop MapReduce的编程接口层主要有5个可编程组件,分别为InputFormat、Mapper、Partitioner、Reducer和OutputFormat。
InputFormat
主要用于描述输入数据的格式,提供两个功能:数据切分:将输入数据切分为若干个split(分片),每个split会被分发到一个Map任务中。
记录识别:通过创建RecordReader,使用它将某个split(分片)中的记录(key, value形式)识别出来(Mapper使用split前的初始化),每个记录会作为Mapper中map函数的输入。
[/list]
getSplits:
它只在逻辑上对输入数据进行分片,并不会在磁盘上将其切片分成分片进行存储。InputSplit只记录了分片的元数据信息(起始位置、长度以及所在的节点列表等)。
createRecordReader:
FileInputFormat的示例:
InputFormat (org.apache.hadoop.mapreduce) 子类层次图:
TextInputFormat分析
[list]文件切分算法
文件切分算法主要决定InputSplit的个数以及每个InputSplit对应的数据段。TextInputFormat继承FileInputFormat,以文件为单位切分生成InputSplit。
blockSize:文件在HDFS中存储的block的大小,默认为64MB,通过dfs.block.size设置。
minSize:InputSplit的最小值,由配置参数mapred.min.split.size设置,默认值为1。
maxSize:InputSplit的最大值,由配置参数mapred.max.split.size设置,默认值为Long.MAX_VALUE。
一旦确定splitSize值后,FileInputFormat将文件一次切成大小为splitSize的InputSplit,最后剩下不足splitSize的数据块单独成为一个InputSplit。
FileSplit
FileSplit继承InputSplit,包含了InputSplit所在的文件、起始位置、长度以及所在host的列表。
其中hosts的获取是通过InputSplit的所在文件查找(向NameNode)获取文件的所有BlockLocation,并通过InputSplit的起始位置查找对应的blkIndex,然后通过blkIndex获取对应BlockLocation的host信息。
LineRecordReader
LineRecordReader继承了RecordReader类,并适配了LineReader类。LineReader类通过构建了buffer字节数组缓冲(缓冲区大小由参数io.file.buffer.size设置,默认为64K),将数据从流中读出(DFSClient.DFSInputStream.read(byte buf[], int off, int len)),当Record跨块时,会重新定位node,并至少再次读取一次(从新定位的node中读取buffer长度的字节数组)
InputFormat
主要用于描述输入数据的格式,提供两个功能:
public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;
getSplits:
引用
Logically split the set of input files for the job.
Each InputSplit is then assigned to an individual Mapper for processing.
Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be <input-file-path, start, offset> tuple. The InputFormat also creates the RecordReader to read the InputSplit.
Each InputSplit is then assigned to an individual Mapper for processing.
Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be <input-file-path, start, offset> tuple. The InputFormat also creates the RecordReader to read the InputSplit.
它只在逻辑上对输入数据进行分片,并不会在磁盘上将其切片分成分片进行存储。InputSplit只记录了分片的元数据信息(起始位置、长度以及所在的节点列表等)。
createRecordReader:
引用
Create a record reader for a given split. The framework will call RecordReader.initialize(InputSplit, TaskAttemptContext) before the split is used.
FileInputFormat的示例:
InputFormat (org.apache.hadoop.mapreduce) 子类层次图:
TextInputFormat分析
[list]
引用
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
在计算splitSize中使用了blockSize, minSize, maxSize。
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
blockSize:文件在HDFS中存储的block的大小,默认为64MB,通过dfs.block.size设置。
minSize:InputSplit的最小值,由配置参数mapred.min.split.size设置,默认值为1。
maxSize:InputSplit的最大值,由配置参数mapred.max.split.size设置,默认值为Long.MAX_VALUE。
一旦确定splitSize值后,FileInputFormat将文件一次切成大小为splitSize的InputSplit,最后剩下不足splitSize的数据块单独成为一个InputSplit。
long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); }
/** Constructs a split with host information * * @param file the file name * @param start the position of the first byte in the file to process * @param length the number of bytes in the file to process * @param hosts the list of hosts containing the block, possibly null */ public FileSplit(Path file, long start, long length, String[] hosts) { this.file = file; this.start = start; this.length = length; this.hosts = hosts; }
其中hosts的获取是通过InputSplit的所在文件查找(向NameNode)获取文件的所有BlockLocation,并通过InputSplit的起始位置查找对应的blkIndex,然后通过blkIndex获取对应BlockLocation的host信息。
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new LineRecordReader(); }
LineRecordReader继承了RecordReader类,并适配了LineReader类。LineReader类通过构建了buffer字节数组缓冲(缓冲区大小由参数io.file.buffer.size设置,默认为64K),将数据从流中读出(DFSClient.DFSInputStream.read(byte buf[], int off, int len)),当Record跨块时,会重新定位node,并至少再次读取一次(从新定位的node中读取buffer长度的字节数组)
if (pos > blockEnd) { currentNode = blockSeekTo(pos); } int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L)); int result = readBuffer(buf, off, realLen);
发表评论
-
[实验]avro与non-avro的mapred例子-wordcount改写
2013-09-03 16:15 1052avro非常适合用于hadoop。在开发的时候可能有这样的场景 ... -
[实验]hadoop例子 trackinfo数据清洗的改写
2013-09-03 10:42 1091之前的“trackinfo数据清洗”例子中为使用combine ... -
[笔记]hadoop tutorial - Reducer
2013-09-03 10:15 737引用Reducer reduces a set of inte ... -
[实验]hadoop例子 trackinfo数据清洗
2013-09-02 17:24 2568业务场景: 假设用户在某处(例如某个网页或者某个地点)的活动会 ... -
[环境] hadoop 开发环境maven管理
2013-09-02 17:02 1469贴一下整理的maven管理配置(待补充) <proj ... -
[笔记]avro 介绍及官网例子
2013-09-02 14:22 3895Apache Avro是一个独立于编程语言的数据序列化系统。旨 ... -
[实验]hadoop例子 在线用户分析
2013-08-30 15:54 900一个简单的业务场景和例子。由wordcount例子改写。 业 ... -
[笔记]hdfs namenode FSNamesystem分析
2013-08-30 09:18 1153NameNode在内存中维护整个文件系统的元数据镜像,用于HD ... -
[笔记]hdfs namenode FSImage分析1
2013-08-29 15:10 1887元数据文件fsimage的分析 fsimage为元数据镜像文件 ... -
[实验]集群hadoop配置
2013-08-28 16:53 855环境 hadoop1.2.0 CentOS release ... -
[实验]单机hadoop配置
2013-08-28 14:16 615环境: hadoop1.2.0 配置 修改conf/core ... -
[问题解决]hadoop eclipse plugin
2013-08-27 09:22 983环境: hadoop 1.2.0 问题: eclipse报错& ...
相关推荐
hadoop mapred_tutorial官方文档
Hadoop源码 包含mapred
基于Hadoop的成绩分析系统 本文档介绍了基于Hadoop的成绩分析系统的设计和实现。Hadoop是一个分布式开源计算平台,具有高可靠性、高扩展性、高效性和高容错性等特点。该系统使用Hadoop的分布式文件系统HDFS和...
如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop...
Hadoop 2.9.0版本中的mapred-default.xml文件包含了MapReduce作业的配置属性,这些属性定义了MapReduce作业执行过程中的各种行为和参数。下面我们来详细介绍mapred-site.xml文件中的一些关键属性。 1. mapreduce....
,Hadoop 技术已经在互联网领域得到了广泛的应用。...同样也得到了许多公司的青睐,如百度主要将Hadoop 应用于日志分析和网页数据库的数据 挖掘;阿里巴巴则将Hadoop 用于商业数据的排序和搜索引擎的优化等。
《Hadoop源代码分析》是一本深入探讨Hadoop核心组件MapReduce的专著。Hadoop是Apache软件基金会的一个开源项目,旨在提供分布式存储和计算框架,以处理和存储大量数据。MapReduce是Hadoop的核心计算模型,它通过将大...
针对本次实验,我们需要用到Hadoop集群作为模拟大数据的分析软件,集群环境必须要包括,hdfs,hbase,hive,flume,sqoop等插件,最后结合分析出来的数据进行可视化展示,需要用到Python(爬取数据集,可视化展示)...
总的来说,设计一个基于Hadoop的数据分析系统涉及到多个环节,从需求分析到系统设计,再到具体的部署和优化,每个步骤都需要细致考虑和精心实施。通过这样的系统,企业能够高效地处理和分析海量数据,从而获取有价值...
Hadoop源代码分析完整版.pdf
深入云计算:Hadoop源代码分析(修订版)
使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...
包org.apache.hadoop.mapreduce的Hadoop源代码分析
【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...
【基于Hadoop的电影影评数据分析】是一项大数据课程的大作业,旨在利用Hadoop的分布式处理能力来分析电影影评数据。Hadoop是一个由Apache软件基金会开发的开源框架,专为处理和存储大规模数据而设计。它由四个核心...
Hadoop 源代码分析 Hadoop 是一个开源的分布式计算框架,由 Apache 基金会维护。Hadoop 的核心组件包括 HDFS(Hadoop Distributed File System)和 MapReduce。HDFS 是一个分布式文件系统,可以存储大量的数据,而 ...
Hadoop 代码使用方式 ...hadoop jar hadoop-mapreduce-custom-inputformat-1.0-SNAPSHOT.jar org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10
使用hadoop进行数据分析需要注意哪些事项?重点做好哪些问题?.zip 使用hadoop进行数据分析需要注意哪些事项?重点做好哪些问题?.zip 使用hadoop进行数据分析需要注意哪些事项?重点做好哪些问题?.zip 使用hadoop...
**基于Hadoop平台的数据仓库可行性分析报告** **1. 引言** 在信息化时代,企业对数据处理的需求日益增长,传统的数据仓库系统由于其规模、性能和灵活性的限制,已经无法满足现代企业对大数据处理的需求。Hadoop作为...