`

Hadoop中CombineFileInputFormat详解

 
阅读更多

 

转http://blog.csdn.net/wawmg/article/details/17095125

 

在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。

所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。

CombineFileInputFormat 原理(网上牛人总结):

 

第一次:将同DN上的所有block生成Split,生成方式:

1.循环nodeToBlocks,获得每个DN上有哪些block

2.循环这些block列表

3.将block从blockToNodes中移除,避免同一个block被包含在多个split中

4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes

5.向临时变量curSplitSize增加block的大小

6.判断curSplitSize是否已经超过了设置的maxSize

a) 如果超过,执行并添加split信息,并重置curSplitSizevalidBlocks

b) 没有超过,继续循环block列表,跳到第2步

7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小

a) 如果允许,执行并添加split信息

b) 如果不被允许,将这些剩余的block归还blockToNodes

8.重置

9.跳到步骤1

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. // process all nodes and create splits that are local     
  2.     // to a node.      
  3.     //创建同一个DN上的split     
  4.     for (Iterator<Map.Entry<String,      
  5.          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();      
  6.          iter.hasNext();) {     
  7.      
  8.       Map.Entry<String, List<OneBlockInfo>> one = iter.next();     
  9.       nodes.add(one.getKey());     
  10.       List<OneBlockInfo> blocksInNode = one.getValue();     
  11.      
  12.       // for each block, copy it into validBlocks. Delete it from      
  13.       // blockToNodes so that the same block does not appear in      
  14.       // two different splits.     
  15.       for (OneBlockInfo oneblock : blocksInNode) {     
  16.         if (blockToNodes.containsKey(oneblock)) {     
  17.           validBlocks.add(oneblock);     
  18.           blockToNodes.remove(oneblock);     
  19.           curSplitSize += oneblock.length;     
  20.      
  21.           // if the accumulated split size exceeds the maximum, then      
  22.           // create this split.     
  23.           if (maxSize != 0 && curSplitSize >= maxSize) {     
  24.             // create an input split and add it to the splits array     
  25.             //创建这些block合并后的split,并将其split添加到split列表中     
  26.             addCreatedSplit(job, splits, nodes, validBlocks);     
  27.             //重置     
  28.             curSplitSize = 0;     
  29.             validBlocks.clear();     
  30.           }     
  31.         }     
  32.       }     
  33.       // if there were any blocks left over and their combined size is     
  34.       // larger than minSplitNode, then combine them into one split.     
  35.       // Otherwise add them back to the unprocessed pool. It is likely      
  36.       // that they will be combined with other blocks from the same rack later on.     
  37.       //其实这里的注释已经说的很清楚,我再按照我的理解说一下     
  38.       /**   
  39.        * 这里有几种情况:   
  40.        * 1、在这个DN上还有没有被split的block,   
  41.        * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),   
  42.        * 将把这些block合并成一个split   
  43.        * 2、剩余的block的大小还是没有达到,将剩余的这些block   
  44.        * 归还给blockToNodes,等以后统一处理   
  45.        */     
  46.       if (minSizeNode != 0 && curSplitSize >= minSizeNode) {     
  47.         // create an input split and add it to the splits array     
  48.         addCreatedSplit(job, splits, nodes, validBlocks);     
  49.       } else {     
  50.         for (OneBlockInfo oneblock : validBlocks) {     
  51.           blockToNodes.put(oneblock, oneblock.hosts);     
  52.         }     
  53.       }     
  54.       validBlocks.clear();     
  55.       nodes.clear();     
  56.       curSplitSize = 0;     
  57.     }     


第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)

 

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. // if blocks in a rack are below the specified minimum size, then keep them     
  2.     // in 'overflow'. After the processing of all racks is complete, these overflow     
  3.     // blocks will be combined into splits.     
  4.     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();     
  5.     ArrayList<String> racks = new ArrayList<String>();     
  6.      
  7.     // Process all racks over and over again until there is no more work to do.     
  8.     //这里处理的就不再是同一个DN上的block     
  9.     //同一个DN上的已经被处理过了(上面的代码),这里是一些     
  10.     //还没有被处理的block     
  11.     while (blockToNodes.size() > 0) {     
  12.      
  13.       // Create one split for this rack before moving over to the next rack.      
  14.       // Come back to this rack after creating a single split for each of the      
  15.       // remaining racks.     
  16.       // Process one rack location at a time, Combine all possible blocks that     
  17.       // reside on this rack as one split. (constrained by minimum and maximum     
  18.       // split size).     
  19.      
  20.       // iterate over all racks      
  21.     //创建同机架的split     
  22.       for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =      
  23.            rackToBlocks.entrySet().iterator(); iter.hasNext();) {     
  24.      
  25.         Map.Entry<String, List<OneBlockInfo>> one = iter.next();     
  26.         racks.add(one.getKey());     
  27.         List<OneBlockInfo> blocks = one.getValue();     
  28.      
  29.         // for each block, copy it into validBlocks. Delete it from      
  30.         // blockToNodes so that the same block does not appear in      
  31.         // two different splits.     
  32.         boolean createdSplit = false;     
  33.         for (OneBlockInfo oneblock : blocks) {     
  34.             //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split     
  35.           if (blockToNodes.containsKey(oneblock)) {     
  36.             validBlocks.add(oneblock);     
  37.             blockToNodes.remove(oneblock);     
  38.             curSplitSize += oneblock.length;     
  39.            
  40.             // if the accumulated split size exceeds the maximum, then      
  41.             // create this split.     
  42.             if (maxSize != 0 && curSplitSize >= maxSize) {     
  43.               // create an input split and add it to the splits array     
  44.               addCreatedSplit(job, splits, getHosts(racks), validBlocks);     
  45.               createdSplit = true;     
  46.               break;     
  47.             }     
  48.           }     
  49.         }     
  50.      
  51.         // if we created a split, then just go to the next rack     
  52.         if (createdSplit) {     
  53.           curSplitSize = 0;     
  54.           validBlocks.clear();     
  55.           racks.clear();     
  56.           continue;     
  57.         }     
  58.      
  59.         //还有没有被split的block     
  60.         //如果这些block的大小大于了同机架的最小split,     
  61.         //则创建split     
  62.         //否则,将这些block留到后面处理     
  63.         if (!validBlocks.isEmpty()) {     
  64.           if (minSizeRack != 0 && curSplitSize >= minSizeRack) {     
  65.             // if there is a mimimum size specified, then create a single split     
  66.             // otherwise, store these blocks into overflow data structure     
  67.             addCreatedSplit(job, splits, getHosts(racks), validBlocks);     
  68.           } else {     
  69.             // There were a few blocks in this rack that remained to be processed.     
  70.             // Keep them in 'overflow' block list. These will be combined later.     
  71.             overflowBlocks.addAll(validBlocks);     
  72.           }     
  73.         }     
  74.         curSplitSize = 0;     
  75.         validBlocks.clear();     
  76.         racks.clear();     
  77.       }     
  78.     }     


最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了

 

 

 

源码总结:

 

合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack

将可以合并的block写到同一个split中

下面是实践代码:

原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。

自定义CombineSequenceFileInputFormat:

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. package com.hadoop.combineInput;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.mapreduce.InputSplit;  
  6. import org.apache.hadoop.mapreduce.RecordReader;  
  7. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  8. import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;  
  9. import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;  
  10. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
  11.   
  12. public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {  
  13.     @SuppressWarnings({ "unchecked""rawtypes" })  
  14.     @Override  
  15.     public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  
  16.         return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);  
  17.     }  
  18. }  


实现 CombineSequenceFileRecordReader

 

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. package com.hadoop.combineInput;  
  2.   
  3.   
  4. import java.io.IOException;  
  5.   
  6. import org.apache.hadoop.mapreduce.InputSplit;  
  7. import org.apache.hadoop.mapreduce.RecordReader;  
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  9. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
  10. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  11. import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;  
  12. import org.apache.hadoop.util.ReflectionUtils;  
  13.   
  14.   
  15. public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> {  
  16.     private CombineFileSplit split;  
  17.     private TaskAttemptContext context;  
  18.     private int index;  
  19.     private RecordReader<K, V> rr;  
  20.   
  21.     @SuppressWarnings("unchecked")  
  22.     public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {  
  23.         this.index = index;  
  24.         this.split = (CombineFileSplit) split;  
  25.         this.context = context;  
  26.   
  27.         this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
  28.     }  
  29.   
  30.     @SuppressWarnings("unchecked")  
  31.     @Override  
  32.     public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {  
  33.         this.split = (CombineFileSplit) curSplit;  
  34.         this.context = curContext;  
  35.   
  36.         if (null == rr) {  
  37.             rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
  38.         }  
  39.   
  40.         FileSplit fileSplit = new FileSplit(this.split.getPath(index),  
  41.                 this.split.getOffset(index), this.split.getLength(index),  
  42.                 this.split.getLocations());  
  43.           
  44.         this.rr.initialize(fileSplit, this.context);  
  45.     }  
  46.   
  47.     @Override  
  48.     public float getProgress() throws IOException, InterruptedException {  
  49.         return rr.getProgress();  
  50.     }  
  51.   
  52.     @Override  
  53.     public void close() throws IOException {  
  54.         if (null != rr) {  
  55.             rr.close();  
  56.             rr = null;  
  57.         }  
  58.     }  
  59.   
  60.     @Override  
  61.     public K getCurrentKey()  
  62.     throws IOException, InterruptedException {  
  63.         return rr.getCurrentKey();  
  64.     }  
  65.   
  66.     @Override  
  67.     public V getCurrentValue()  
  68.     throws IOException, InterruptedException {  
  69.         return rr.getCurrentValue();  
  70.     }  
  71.   
  72.     @Override  
  73.     public boolean nextKeyValue() throws IOException, InterruptedException {  
  74.         return rr.nextKeyValue();  
  75.     }  
  76. }  

参考资料:http://sourceforge.net/p/openimaj/code/HEAD/tree/trunk/hadoop/core-hadoop/src/main/java/org/openimaj/hadoop/sequencefile/combine/CombineSequenceFileRecordReader.java

 

main函数比较简单,这里也贴出来下,方便后续自己记忆:

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. package com.hadoop.combineInput;  
  2.   
  3. import java.io.IOException;  
  4.   
  5.   
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.conf.Configured;  
  8. import org.apache.hadoop.fs.Path;  
  9.   
  10. import org.apache.hadoop.io.BytesWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Job;  
  13. import org.apache.hadoop.mapreduce.Mapper;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
  17. import org.apache.hadoop.util.Tool;  
  18. import org.apache.hadoop.util.ToolRunner;  
  19.   
  20. public class MergeFiles extends Configured implements Tool {   
  21.     public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> {  
  22.   
  23.         public void map(BytesWritable key, Text value, Context context)  
  24.                 throws IOException, InterruptedException {  
  25.             context.write(key, value);  
  26.         }  
  27.     } // END: MapClass  
  28.   
  29.       
  30.     public int run(String[] args) throws Exception {  
  31.         Configuration conf = new Configuration();  
  32.         conf.set("mapred.max.split.size""157286400");  
  33.         conf.setBoolean("mapred.output.compress"true);  
  34.         Job job = new Job(conf);  
  35.         job.setJobName("MergeFiles");  
  36.         job.setJarByClass(MergeFiles.class);  
  37.   
  38.         job.setMapperClass(MapClass.class);  
  39.         job.setInputFormatClass(CombineSequenceFileInputFormat.class);  
  40.         job.setOutputFormatClass(SequenceFileOutputFormat.class);  
  41.         job.setOutputKeyClass(BytesWritable.class);  
  42.         job.setOutputValueClass(Text.class);  
  43.   
  44.         FileInputFormat.addInputPaths(job, args[0]);  
  45.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  46.   
  47.         job.setNumReduceTasks(0);  
  48.   
  49.         return job.waitForCompletion(true) ? 0 : 1;  
  50.     } // END: run  
  51.   
  52.     public static void main(String[] args) throws Exception {  
  53.         int ret = ToolRunner.run(new MergeFiles(), args);  
  54.         System.exit(ret);  
  55.     } // END: main  
  56. //   
[java] view plaincopy
 
  1.   

性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。

 

存在问题:

  1. 合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡
分享到:
评论

相关推荐

    006_hadoop中MapReduce详解_3

    "006_hadoop中MapReduce详解_3"可能是指一个系列教程的第三部分,着重讲解MapReduce的核心概念、工作原理以及实际应用。在这个部分,我们可能会探讨更深入的技术细节和优化策略。 MapReduce的工作流程分为两个主要...

    Hadoop技术详解.Hadoop Operation

    《Hadoop技术详解》这本书是关于Hadoop操作的详尽指南,它涵盖了Hadoop生态系统中的核心组件、工作原理以及实际操作技巧。Hadoop是大数据处理领域的重要工具,它以其分布式计算框架闻名,允许企业在大规模数据集上...

    hadoop集群搭建详解

    Hadoop集群搭建详解 Hadoop是一个大数据处理框架,由Apache基金会开发和维护。它提供了一个分布式计算环境,能够处理大量数据。Hadoop生态系统包括了多个组件,如HDFS、MapReduce、YARN、HBase、Hive等。 Hadoop...

    Hadoop源代码详解.doc

    Hadoop源代码详解.doc

    Hadoop运行流程详解

    Hadoop运行流程详解 Hadoop是一个开源分布式计算框架,核心由两个主要组件构成:HDFS(Hadoop Distributed File System)和MapReduce。本篇将详细阐述Hadoop中的MapReduce执行流程,包括其主要概念、数据结构和整体...

    Hadoop fs命令详解.docx

    Hadoop fs命令详解 Hadoop fs命令是Hadoop分布式文件系统(HDFS)的命令行接口,提供了丰富的文件操作命令,方便用户管理和维护HDFS文件系统。本文将详细介绍Hadoop fs命令的使用方法和实践操作。 基本命令 ...

    hadoop集群配置详解

    在这个详解中,我们将深入理解如何在Fedora和Ubuntu系统上搭建一个Hadoop集群。 首先,为了确保集群中的节点能够相互识别,我们需要配置静态IP地址。在Ubuntu系统中,可以通过图形界面进行配置,而在Fedora系统中,...

    Hadoop-CombineFileInputFormat:hadoop CombineFileInputFormat的示例实现

    合并文件InputFormat演示我的演示项目...用法hadoop jar CombineFileDemo-0.0.1-SNAPSHOT.jar TestMain &lt;src&gt; 执照版权所有:copyright:2014 Felix Chern 根据Eclipse Public License 1.0版或(可选)任何更高版本分发。

    hadoop配置文件详解

    Hadoop是一种开源框架,用于分布式存储和处理大数据。它依赖于多个配置文件来定义其运行时行为。...在Hadoop的后续版本中,可能会有更多新的参数被引入,因此需要定期查阅官方文档,以获取最新和最准确的信息。

    hadoop版本差异详解.doc

    1. **Append支持**:在Hadoop的某些版本中,文件追加功能(Append)被引入,这对于需要实时更新数据的应用,如HBase,至关重要。 2. **RAID**:通过引入校验码来减少数据块数量,RAID提升了数据的可靠性,同时降低...

    hadoop安装配置详解

    以下是对"Hadoop安装配置详解"的详细阐述: ### 1. 虚拟机安装 在进行Hadoop安装前,首先需要准备一个虚拟机环境。通常选择如VMware或VirtualBox等虚拟机软件。为了便于管理和扩展,建议采用Linux操作系统,如...

    hadoop常用命令详解,配有例子说明

    hadoop的常用命令详解,并配有例子说明详细信息

    hadoop详解

    ### Hadoop 分布式存储与计算框架详解 #### 一、引言 随着互联网技术的飞速发展,数据量呈爆炸性增长趋势,传统的数据处理方法已无法满足需求。为了解决大规模数据处理的问题,Hadoop应运而生。本文将详细介绍...

    hadoop海量数据处理详解与项目实战

    它可以将关系数据库中的数据导入到Hadoop的HDFS中,或者将数据从HDFS导出到外部系统中,如关系型数据库。 ### Python在Hadoop中的应用 描述中提到代码是使用Python语言编写的,Python作为一门编程语言,具有简洁易...

    Hadoop技术详解PDF电子书下载 带书签目录

    Hadoop作为一种强大的大数据处理框架,在当前的数据密集型应用中扮演着重要的角色。无论是对于数据科学家还是对于企业级应用开发者来说,了解Hadoop的基本原理及其生态系统都是十分必要的。通过学习Hadoop的核心组件...

    Hadoop应用开发详解

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。它的核心设计目标是分布式存储和并行处理大规模数据集,使企业能够高效地处理海量数据。本篇文章将深入探讨Hadoop应用开发的关键概念、组件...

    hadoop中文乱码问题

    【Hadoop中文乱码问题详解】 在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储(HDFS)和分布式计算(MapReduce)的能力。然而,在处理包含中文字符的数据时,用户可能会遇到中文乱码的问题...

    详解Hadoop核心架构HDFS

    ### 详解Hadoop核心架构HDFS #### HDFS体系架构概览 Hadoop作为一个领先的开源分布式计算框架,其核心组成部分之一便是Hadoop Distributed File System(HDFS),它为大规模数据处理提供了高效、可靠且可扩展的...

    Hadoop环境搭建详解

    Hadoop环境搭建详细说明,附加截图。

    Hadoop应用开发技术详解(中文版)

    《大数据技术丛书:Hadoop应用开发技术详解》共12章。第1~2章详细地介绍了Hadoop的生态系统、关键技术以及安装和配置;第3章是MapReduce的使用入门,让读者了解整个开发过程;第4~5章详细讲解了分布式文件系统HDFS...

Global site tag (gtag.js) - Google Analytics