`
p_3er
  • 浏览: 55720 次
  • 性别: Icon_minigender_1
  • 来自: 广州
文章分类
社区版块
存档分类
最新评论

Hadoop 使用Combiner提高Map/Reduce程序效率

 
阅读更多

众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。


在上述过程中,我们看到至少两个性能瓶颈:

  1. 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
  2. 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。


如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:


Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。


由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程。

代码如下:

  1. packagecom;
  2. importjava.io.IOException;
  3. importorg.apache.hadoop.conf.Configuration;
  4. importorg.apache.hadoop.conf.Configured;
  5. importorg.apache.hadoop.fs.Path;
  6. importorg.apache.hadoop.io.DoubleWritable;
  7. importorg.apache.hadoop.io.LongWritable;
  8. importorg.apache.hadoop.io.Text;
  9. importorg.apache.hadoop.mapreduce.Job;
  10. importorg.apache.hadoop.mapreduce.Mapper;
  11. importorg.apache.hadoop.mapreduce.Reducer;
  12. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. importorg.apache.hadoop.util.Tool;
  17. importorg.apache.hadoop.util.ToolRunner;
  18. publicclassAveragingWithCombinerextendsConfiguredimplementsTool{
  19. publicstaticclassMapClassextendsMapper<LongWritable,Text,Text,Text>{
  20. staticenumClaimsCounters{MISSING,QUOTED};
  21. //MapMethod
  22. publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
  23. Stringfields[]=value.toString().split(",",-20);
  24. Stringcountry=fields[4];
  25. StringnumClaims=fields[8];
  26. if(numClaims.length()>0&&!numClaims.startsWith("\"")){
  27. context.write(newText(country),newText(numClaims+",1"));
  28. }
  29. }
  30. }
  31. publicstaticclassReduceextendsReducer<Text,Text,Text,DoubleWritable>{
  32. //ReduceMethod
  33. publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
  34. doublesum=0;
  35. intcount=0;
  36. for(Textvalue:values){
  37. Stringfields[]=value.toString().split(",");
  38. sum+=Double.parseDouble(fields[0]);
  39. count+=Integer.parseInt(fields[1]);
  40. }
  41. context.write(key,newDoubleWritable(sum/count));
  42. }
  43. }
  44. publicstaticclassCombineextendsReducer<Text,Text,Text,Text>{
  45. //ReduceMethod
  46. publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
  47. doublesum=0;
  48. intcount=0;
  49. for(Textvalue:values){
  50. Stringfields[]=value.toString().split(",");
  51. sum+=Double.parseDouble(fields[0]);
  52. count+=Integer.parseInt(fields[1]);
  53. }
  54. context.write(key,newText(sum+","+count));
  55. }
  56. }
  57. //runMethod
  58. publicintrun(String[]args)throwsException{
  59. //CreateandRuntheJob
  60. Jobjob=newJob();
  61. job.setJarByClass(AveragingWithCombiner.class);
  62. FileInputFormat.addInputPath(job,newPath(args[0]));
  63. FileOutputFormat.setOutputPath(job,newPath(args[1]));
  64. job.setJobName("AveragingWithCombiner");
  65. job.setMapperClass(MapClass.class);
  66. job.setCombinerClass(Combine.class);
  67. job.setReducerClass(Reduce.class);
  68. job.setInputFormatClass(TextInputFormat.class);
  69. job.setOutputFormatClass(TextOutputFormat.class);
  70. job.setOutputKeyClass(Text.class);
  71. job.setOutputValueClass(Text.class);
  72. System.exit(job.waitForCompletion(true)?0:1);
  73. return0;
  74. }
  75. publicstaticvoidmain(String[]args)throwsException{
  76. intres=ToolRunner.run(newConfiguration(),newAveragingWithCombiner(),args);
  77. System.exit(res);
  78. }
  79. }

分享到:
评论

相关推荐

    Windows平台下Hadoop的Map/Reduce开发

    Eclipse是一款广泛使用的Java集成开发环境,可以用来编写和调试Hadoop Map/Reduce程序。通过以下步骤在Eclipse中配置Hadoop: 1. 设置Hadoop主目录,指向Hadoop安装位置。 2. 创建Hadoop的远程工作区,指定HDFS中的...

    hadoop之map/reduce

    在开发MapReduce程序时,还要注意优化性能,例如合理设置分区器(Partitioner)、Combiner(如果适用)以及优化数据本地性,以提高计算效率。同时,日志处理和异常处理也非常重要,可以帮助开发者更好地跟踪和调试...

    提高hadoop的mapreduce job效率笔记

    例如,避免在 Map 或 Reduce 函数中进行不必要的计算,减少对象创建,以及利用缓存和批处理。 通过综合运用这些优化策略,可以显著提升 Hadoop MapReduce Job 的处理效率,同时确保数据处理的准确性和可扩展性。...

    Hadoop Combiner使用方法详解

    Hadoop Combiner是MapReduce编程模型中的一个重要组件,它可以减少发送到Reducer的数据量,从而提高网络效率和Reduce端的效率。下面是Hadoop Combiner的使用方法详解: Combiner的优点 1. 减少发送到Reducer的数据...

    hadoop,map,reduce,hdfs

    为了提高存储效率和传输速度,Hadoop支持对数据进行压缩。常见的压缩算法有Gzip、BZip2、Snappy等。序列化则用于将对象转换为可以存储或传输的形式,Hadoop支持多种序列化格式,如Writables、JSON、Thrift等。 ####...

    Hadoop Map Reduce教程

    - **Combiner 使用**:在 Map 端使用 Combiner 函数预先聚合数据,减少网络传输量。 #### 五、案例分析 - **WordCount 示例**:这是一个经典的 MapReduce 示例,用于统计文本文件中单词出现的次数。 - **Inverted ...

    Hadoop MapReduce实战手册(完整版)

    Combiner可以在本地节点上提前减少数据传输,提高效率;Partitioner则控制中间结果发送到哪个Reducer,通常基于键的哈希值。 对于实际应用,书中可能提供了案例研究,如日志分析、网页排名(PageRank)计算、关联...

    MapReduce Tutorial

    Hadoop Map/Reduce 是一个软件框架,用于编写能够并行处理海量数据(多太字节级别的数据集)的应用程序。这些应用程序可以在大量由商用硬件组成的集群(数千个节点)上可靠且容错地运行。 一个典型的 Map/Reduce ...

    hadoop-training-map-reduce-example-4

    标题中的"hadoop-training-map-reduce-example-4"表明这是一个关于Hadoop MapReduce的教程实例,很可能是第四个阶段或示例。Hadoop是Apache软件基金会的一个开源项目,它提供了分布式文件系统(HDFS)和MapReduce...

    IT面试-Hadoop总结-云计算

    2. Map/Reduce Slots调优:Map/Reduce Slots是Hadoop中的并发度参数,通过调整这个参数可以提高Hadoop的并发处理能力。 3.Job调优:Job是Hadoop中的计算任务,可以通过调整Job的参数来提高计算效率。 四、Hadoop...

    Map_Reduce_Hadoop:实施map-reduce程序来执行等值连接

    在大数据处理领域,Hadoop MapReduce是一种分布式计算框架,它允许开发者编写并运行处理大规模数据集的应用程序。本文将深入探讨如何使用MapReduce和Java来实现等值连接操作,这是一种在数据库查询中常见的操作,...

    Hadoop倒排索引程序

    此外,还可以考虑使用更高级的分布式索引结构,如Bloom Filter或Lucene等,以提高索引质量和查询效率。 总的来说,“Hadoop倒排索引程序”是Hadoop并行框架在文本处理和信息检索领域的成功实践,它展示了大数据处理...

    hadoop集群配置及mapreduce开发手册

    ### Hadoop集群配置及MapReduce开发手册知识点梳理 #### 一、Hadoop集群配置说明 ##### 1.1 环境说明 ...通过以上步骤,可以有效地配置和优化Hadoop集群,提高MapReduce程序的运行效率和稳定性。

    hadoop-eclipse-plugin-2.10.1.zip

    2. **代码优化**:利用Hadoop的特性,如Combiner和Partitioner,来提高MapReduce程序的性能。 3. **测试与调试**:充分利用插件提供的调试功能,对代码进行充分的单元测试,确保程序的正确性。 六、总结 Hadoop ...

    avg-time hadoop程序

    例如,他们可能会使用Combiner来减少网络传输的数据量,提高效率;或者采用自定义分区器(Partitioner)来确保相同键的数据被发送到同一个Reducer,从而优化并行处理。 为了运行这个名为"hw2"的Hadoop程序,你需要...

    开发和优化高效的Hadoop & Hive 程序

    使用StringBuilder代替StringBuffer和Formatter,可以提高字符串拼接的效率;而在处理配置文件和词典时,可以利用DistributedCache来加载这些文件到TaskTracker节点上,从而避免在map和reduce阶段重复读取外部文件。...

    Hadoop C++ 扩展

    10. **Combiner**:在Map阶段对输出进行局部聚合,减少Shuffle阶段的数据传输量。 11. **Committer**:负责将最终结果写入到指定位置。 12. **Compression**:支持数据的压缩和解压缩功能。 13. **...

    hadoop的java_api

    在Hadoop 2.x中,YARN(Yet Another Resource Negotiator)引入,将资源管理和任务调度分离,提高了系统的灵活性和效率。 9. **Hadoop配置**:在使用Hadoop Java API时,通常需要通过`Configuration`对象设置参数,...

    词频统计,利用Hadoop中mappereduce进行单词的计数

    - Combiner:在map阶段使用本地聚合,减少中间数据的大小。 - Reducer数量调整:根据数据规模和计算资源来设置合适的Reducer数量,过多或过少都可能影响效率。 通过Hadoop MapReduce进行词频统计,不仅可以应用于...

Global site tag (gtag.js) - Google Analytics