转http://blog.csdn.net/wawmg/article/details/17095125
在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。
所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。
CombineFileInputFormat 原理(网上牛人总结):
第一次:将同DN上的所有block生成Split,生成方式:
1.循环nodeToBlocks,获得每个DN上有哪些block
2.循环这些block列表
3.将block从blockToNodes中移除,避免同一个block被包含在多个split中
4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中
5.向临时变量curSplitSize增加block的大小
6.判断curSplitSize是否已经超过了设置的maxSize
a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks
b) 没有超过,继续循环block列表,跳到第2步
7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)
a) 如果允许,执行并添加split信息
b) 如果不被允许,将这些剩余的block归还blockToNodes
8.重置
9.跳到步骤1
- // process all nodes and create splits that are local
- // to a node.
- //创建同一个DN上的split
- for (Iterator<Map.Entry<String,
- List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
- iter.hasNext();) {
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- //创建这些block合并后的split,并将其split添加到split列表中
- addCreatedSplit(job, splits, nodes, validBlocks);
- //重置
- curSplitSize = 0;
- validBlocks.clear();
- }
- }
- }
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the same rack later on.
- //其实这里的注释已经说的很清楚,我再按照我的理解说一下
- /**
- * 这里有几种情况:
- * 1、在这个DN上还有没有被split的block,
- * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),
- * 将把这些block合并成一个split
- * 2、剩余的block的大小还是没有达到,将剩余的这些block
- * 归还给blockToNodes,等以后统一处理
- */
- if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, nodes, validBlocks);
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
- }
- }
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
- }
第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)
- // if blocks in a rack are below the specified minimum size, then keep them
- // in 'overflow'. After the processing of all racks is complete, these overflow
- // blocks will be combined into splits.
- ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> racks = new ArrayList<String>();
- // Process all racks over and over again until there is no more work to do.
- //这里处理的就不再是同一个DN上的block
- //同一个DN上的已经被处理过了(上面的代码),这里是一些
- //还没有被处理的block
- while (blockToNodes.size() > 0) {
- // Create one split for this rack before moving over to the next rack.
- // Come back to this rack after creating a single split for each of the
- // remaining racks.
- // Process one rack location at a time, Combine all possible blocks that
- // reside on this rack as one split. (constrained by minimum and maximum
- // split size).
- // iterate over all racks
- //创建同机架的split
- for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
- rackToBlocks.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- racks.add(one.getKey());
- List<OneBlockInfo> blocks = one.getValue();
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- boolean createdSplit = false;
- for (OneBlockInfo oneblock : blocks) {
- //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, getHosts(racks), validBlocks);
- createdSplit = true;
- break;
- }
- }
- }
- // if we created a split, then just go to the next rack
- if (createdSplit) {
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- continue;
- }
- //还有没有被split的block
- //如果这些block的大小大于了同机架的最小split,
- //则创建split
- //否则,将这些block留到后面处理
- if (!validBlocks.isEmpty()) {
- if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
- // if there is a mimimum size specified, then create a single split
- // otherwise, store these blocks into overflow data structure
- addCreatedSplit(job, splits, getHosts(racks), validBlocks);
- } else {
- // There were a few blocks in this rack that remained to be processed.
- // Keep them in 'overflow' block list. These will be combined later.
- overflowBlocks.addAll(validBlocks);
- }
- }
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- }
- }
最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了
源码总结:
合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack
将可以合并的block写到同一个split中
下面是实践代码:
原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。
自定义CombineSequenceFileInputFormat:
- package com.hadoop.combineInput;
- import java.io.IOException;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
- public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
- return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);
- }
- }
实现 CombineSequenceFileRecordReader
- package com.hadoop.combineInput;
- import java.io.IOException;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
- import org.apache.hadoop.util.ReflectionUtils;
- public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> {
- private CombineFileSplit split;
- private TaskAttemptContext context;
- private int index;
- private RecordReader<K, V> rr;
- @SuppressWarnings("unchecked")
- public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {
- this.index = index;
- this.split = (CombineFileSplit) split;
- this.context = context;
- this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());
- }
- @SuppressWarnings("unchecked")
- @Override
- public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {
- this.split = (CombineFileSplit) curSplit;
- this.context = curContext;
- if (null == rr) {
- rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());
- }
- FileSplit fileSplit = new FileSplit(this.split.getPath(index),
- this.split.getOffset(index), this.split.getLength(index),
- this.split.getLocations());
- this.rr.initialize(fileSplit, this.context);
- }
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return rr.getProgress();
- }
- @Override
- public void close() throws IOException {
- if (null != rr) {
- rr.close();
- rr = null;
- }
- }
- @Override
- public K getCurrentKey()
- throws IOException, InterruptedException {
- return rr.getCurrentKey();
- }
- @Override
- public V getCurrentValue()
- throws IOException, InterruptedException {
- return rr.getCurrentValue();
- }
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return rr.nextKeyValue();
- }
- }
main函数比较简单,这里也贴出来下,方便后续自己记忆:
- package com.hadoop.combineInput;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class MergeFiles extends Configured implements Tool {
- public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> {
- public void map(BytesWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- context.write(key, value);
- }
- } // END: MapClass
- public int run(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.set("mapred.max.split.size", "157286400");
- conf.setBoolean("mapred.output.compress", true);
- Job job = new Job(conf);
- job.setJobName("MergeFiles");
- job.setJarByClass(MergeFiles.class);
- job.setMapperClass(MapClass.class);
- job.setInputFormatClass(CombineSequenceFileInputFormat.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.addInputPaths(job, args[0]);
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setNumReduceTasks(0);
- return job.waitForCompletion(true) ? 0 : 1;
- } // END: run
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new MergeFiles(), args);
- System.exit(ret);
- } // END: main
- } //
性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。
存在问题:
- 合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡
相关推荐
"006_hadoop中MapReduce详解_3"可能是指一个系列教程的第三部分,着重讲解MapReduce的核心概念、工作原理以及实际应用。在这个部分,我们可能会探讨更深入的技术细节和优化策略。 MapReduce的工作流程分为两个主要...
《Hadoop技术详解》这本书是关于Hadoop操作的详尽指南,它涵盖了Hadoop生态系统中的核心组件、工作原理以及实际操作技巧。Hadoop是大数据处理领域的重要工具,它以其分布式计算框架闻名,允许企业在大规模数据集上...
Hadoop集群搭建详解 Hadoop是一个大数据处理框架,由Apache基金会开发和维护。它提供了一个分布式计算环境,能够处理大量数据。Hadoop生态系统包括了多个组件,如HDFS、MapReduce、YARN、HBase、Hive等。 Hadoop...
Hadoop源代码详解.doc
Hadoop运行流程详解 Hadoop是一个开源分布式计算框架,核心由两个主要组件构成:HDFS(Hadoop Distributed File System)和MapReduce。本篇将详细阐述Hadoop中的MapReduce执行流程,包括其主要概念、数据结构和整体...
Hadoop fs命令详解 Hadoop fs命令是Hadoop分布式文件系统(HDFS)的命令行接口,提供了丰富的文件操作命令,方便用户管理和维护HDFS文件系统。本文将详细介绍Hadoop fs命令的使用方法和实践操作。 基本命令 ...
在这个详解中,我们将深入理解如何在Fedora和Ubuntu系统上搭建一个Hadoop集群。 首先,为了确保集群中的节点能够相互识别,我们需要配置静态IP地址。在Ubuntu系统中,可以通过图形界面进行配置,而在Fedora系统中,...
合并文件InputFormat演示我的演示项目...用法hadoop jar CombineFileDemo-0.0.1-SNAPSHOT.jar TestMain <src> 执照版权所有:copyright:2014 Felix Chern 根据Eclipse Public License 1.0版或(可选)任何更高版本分发。
Hadoop是一种开源框架,用于分布式存储和处理大数据。它依赖于多个配置文件来定义其运行时行为。...在Hadoop的后续版本中,可能会有更多新的参数被引入,因此需要定期查阅官方文档,以获取最新和最准确的信息。
1. **Append支持**:在Hadoop的某些版本中,文件追加功能(Append)被引入,这对于需要实时更新数据的应用,如HBase,至关重要。 2. **RAID**:通过引入校验码来减少数据块数量,RAID提升了数据的可靠性,同时降低...
以下是对"Hadoop安装配置详解"的详细阐述: ### 1. 虚拟机安装 在进行Hadoop安装前,首先需要准备一个虚拟机环境。通常选择如VMware或VirtualBox等虚拟机软件。为了便于管理和扩展,建议采用Linux操作系统,如...
hadoop的常用命令详解,并配有例子说明详细信息
### Hadoop 分布式存储与计算框架详解 #### 一、引言 随着互联网技术的飞速发展,数据量呈爆炸性增长趋势,传统的数据处理方法已无法满足需求。为了解决大规模数据处理的问题,Hadoop应运而生。本文将详细介绍...
它可以将关系数据库中的数据导入到Hadoop的HDFS中,或者将数据从HDFS导出到外部系统中,如关系型数据库。 ### Python在Hadoop中的应用 描述中提到代码是使用Python语言编写的,Python作为一门编程语言,具有简洁易...
Hadoop作为一种强大的大数据处理框架,在当前的数据密集型应用中扮演着重要的角色。无论是对于数据科学家还是对于企业级应用开发者来说,了解Hadoop的基本原理及其生态系统都是十分必要的。通过学习Hadoop的核心组件...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。它的核心设计目标是分布式存储和并行处理大规模数据集,使企业能够高效地处理海量数据。本篇文章将深入探讨Hadoop应用开发的关键概念、组件...
【Hadoop中文乱码问题详解】 在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储(HDFS)和分布式计算(MapReduce)的能力。然而,在处理包含中文字符的数据时,用户可能会遇到中文乱码的问题...
### 详解Hadoop核心架构HDFS #### HDFS体系架构概览 Hadoop作为一个领先的开源分布式计算框架,其核心组成部分之一便是Hadoop Distributed File System(HDFS),它为大规模数据处理提供了高效、可靠且可扩展的...
Hadoop环境搭建详细说明,附加截图。
《大数据技术丛书:Hadoop应用开发技术详解》共12章。第1~2章详细地介绍了Hadoop的生态系统、关键技术以及安装和配置;第3章是MapReduce的使用入门,让读者了解整个开发过程;第4~5章详细讲解了分布式文件系统HDFS...