众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。
在上述过程中,我们看到至少两个性能瓶颈:
- 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
- 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。
Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。
如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:
Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。
由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程。
代码如下:
-
packagecom;
-
-
importjava.io.IOException;
-
-
importorg.apache.hadoop.conf.Configuration;
-
importorg.apache.hadoop.conf.Configured;
-
importorg.apache.hadoop.fs.Path;
-
importorg.apache.hadoop.io.DoubleWritable;
-
importorg.apache.hadoop.io.LongWritable;
-
importorg.apache.hadoop.io.Text;
-
importorg.apache.hadoop.mapreduce.Job;
-
importorg.apache.hadoop.mapreduce.Mapper;
-
importorg.apache.hadoop.mapreduce.Reducer;
-
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
importorg.apache.hadoop.util.Tool;
-
importorg.apache.hadoop.util.ToolRunner;
-
-
publicclassAveragingWithCombinerextendsConfiguredimplementsTool{
-
-
publicstaticclassMapClassextendsMapper<LongWritable,Text,Text,Text>{
-
-
staticenumClaimsCounters{MISSING,QUOTED};
-
-
publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
-
Stringfields[]=value.toString().split(",",-20);
-
Stringcountry=fields[4];
-
StringnumClaims=fields[8];
-
-
if(numClaims.length()>0&&!numClaims.startsWith("\"")){
-
context.write(newText(country),newText(numClaims+",1"));
-
}
-
}
-
}
-
-
publicstaticclassReduceextendsReducer<Text,Text,Text,DoubleWritable>{
-
-
-
publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
-
doublesum=0;
-
intcount=0;
-
for(Textvalue:values){
-
Stringfields[]=value.toString().split(",");
-
sum+=Double.parseDouble(fields[0]);
-
count+=Integer.parseInt(fields[1]);
-
}
-
context.write(key,newDoubleWritable(sum/count));
-
}
-
}
-
-
publicstaticclassCombineextendsReducer<Text,Text,Text,Text>{
-
-
-
publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException{
-
doublesum=0;
-
intcount=0;
-
for(Textvalue:values){
-
Stringfields[]=value.toString().split(",");
-
sum+=Double.parseDouble(fields[0]);
-
count+=Integer.parseInt(fields[1]);
-
}
-
context.write(key,newText(sum+","+count));
-
}
-
}
-
-
-
publicintrun(String[]args)throwsException{
-
-
Jobjob=newJob();
-
job.setJarByClass(AveragingWithCombiner.class);
-
-
FileInputFormat.addInputPath(job,newPath(args[0]));
-
FileOutputFormat.setOutputPath(job,newPath(args[1]));
-
-
job.setJobName("AveragingWithCombiner");
-
job.setMapperClass(MapClass.class);
-
job.setCombinerClass(Combine.class);
-
job.setReducerClass(Reduce.class);
-
job.setInputFormatClass(TextInputFormat.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
-
System.exit(job.waitForCompletion(true)?0:1);
-
return0;
-
}
-
-
publicstaticvoidmain(String[]args)throwsException{
-
intres=ToolRunner.run(newConfiguration(),newAveragingWithCombiner(),args);
-
System.exit(res);
-
}
-
-
}
分享到:
相关推荐
Eclipse是一款广泛使用的Java集成开发环境,可以用来编写和调试Hadoop Map/Reduce程序。通过以下步骤在Eclipse中配置Hadoop: 1. 设置Hadoop主目录,指向Hadoop安装位置。 2. 创建Hadoop的远程工作区,指定HDFS中的...
在开发MapReduce程序时,还要注意优化性能,例如合理设置分区器(Partitioner)、Combiner(如果适用)以及优化数据本地性,以提高计算效率。同时,日志处理和异常处理也非常重要,可以帮助开发者更好地跟踪和调试...
例如,避免在 Map 或 Reduce 函数中进行不必要的计算,减少对象创建,以及利用缓存和批处理。 通过综合运用这些优化策略,可以显著提升 Hadoop MapReduce Job 的处理效率,同时确保数据处理的准确性和可扩展性。...
Hadoop Combiner是MapReduce编程模型中的一个重要组件,它可以减少发送到Reducer的数据量,从而提高网络效率和Reduce端的效率。下面是Hadoop Combiner的使用方法详解: Combiner的优点 1. 减少发送到Reducer的数据...
为了提高存储效率和传输速度,Hadoop支持对数据进行压缩。常见的压缩算法有Gzip、BZip2、Snappy等。序列化则用于将对象转换为可以存储或传输的形式,Hadoop支持多种序列化格式,如Writables、JSON、Thrift等。 ####...
- **Combiner 使用**:在 Map 端使用 Combiner 函数预先聚合数据,减少网络传输量。 #### 五、案例分析 - **WordCount 示例**:这是一个经典的 MapReduce 示例,用于统计文本文件中单词出现的次数。 - **Inverted ...
Combiner可以在本地节点上提前减少数据传输,提高效率;Partitioner则控制中间结果发送到哪个Reducer,通常基于键的哈希值。 对于实际应用,书中可能提供了案例研究,如日志分析、网页排名(PageRank)计算、关联...
Hadoop Map/Reduce 是一个软件框架,用于编写能够并行处理海量数据(多太字节级别的数据集)的应用程序。这些应用程序可以在大量由商用硬件组成的集群(数千个节点)上可靠且容错地运行。 一个典型的 Map/Reduce ...
标题中的"hadoop-training-map-reduce-example-4"表明这是一个关于Hadoop MapReduce的教程实例,很可能是第四个阶段或示例。Hadoop是Apache软件基金会的一个开源项目,它提供了分布式文件系统(HDFS)和MapReduce...
2. Map/Reduce Slots调优:Map/Reduce Slots是Hadoop中的并发度参数,通过调整这个参数可以提高Hadoop的并发处理能力。 3.Job调优:Job是Hadoop中的计算任务,可以通过调整Job的参数来提高计算效率。 四、Hadoop...
在大数据处理领域,Hadoop MapReduce是一种分布式计算框架,它允许开发者编写并运行处理大规模数据集的应用程序。本文将深入探讨如何使用MapReduce和Java来实现等值连接操作,这是一种在数据库查询中常见的操作,...
此外,还可以考虑使用更高级的分布式索引结构,如Bloom Filter或Lucene等,以提高索引质量和查询效率。 总的来说,“Hadoop倒排索引程序”是Hadoop并行框架在文本处理和信息检索领域的成功实践,它展示了大数据处理...
### Hadoop集群配置及MapReduce开发手册知识点梳理 #### 一、Hadoop集群配置说明 ##### 1.1 环境说明 ...通过以上步骤,可以有效地配置和优化Hadoop集群,提高MapReduce程序的运行效率和稳定性。
2. **代码优化**:利用Hadoop的特性,如Combiner和Partitioner,来提高MapReduce程序的性能。 3. **测试与调试**:充分利用插件提供的调试功能,对代码进行充分的单元测试,确保程序的正确性。 六、总结 Hadoop ...
例如,他们可能会使用Combiner来减少网络传输的数据量,提高效率;或者采用自定义分区器(Partitioner)来确保相同键的数据被发送到同一个Reducer,从而优化并行处理。 为了运行这个名为"hw2"的Hadoop程序,你需要...
使用StringBuilder代替StringBuffer和Formatter,可以提高字符串拼接的效率;而在处理配置文件和词典时,可以利用DistributedCache来加载这些文件到TaskTracker节点上,从而避免在map和reduce阶段重复读取外部文件。...
10. **Combiner**:在Map阶段对输出进行局部聚合,减少Shuffle阶段的数据传输量。 11. **Committer**:负责将最终结果写入到指定位置。 12. **Compression**:支持数据的压缩和解压缩功能。 13. **...
在Hadoop 2.x中,YARN(Yet Another Resource Negotiator)引入,将资源管理和任务调度分离,提高了系统的灵活性和效率。 9. **Hadoop配置**:在使用Hadoop Java API时,通常需要通过`Configuration`对象设置参数,...
- Combiner:在map阶段使用本地聚合,减少中间数据的大小。 - Reducer数量调整:根据数据规模和计算资源来设置合适的Reducer数量,过多或过少都可能影响效率。 通过Hadoop MapReduce进行词频统计,不仅可以应用于...