- 浏览: 194422 次
文章分类
最新评论
MapReduce : Combiner的使用(以平均数为例) 并结合in-mapper design pattern 实例
- 博客分类:
- Hadoop
没有使用Combiner 和 in-mapper desgin pattern import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class digitaver1 { public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); context.write(new Text(ss[0]), new IntWritable(Integer.parseInt(ss[1]))); } } public static class reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ sum += value.iterator().next().get(); cnt+=1; } context.write(key, new DoubleWritable((double)sum/(double)cnt)); } } public static void main(String[] args) { try { Job job = new Job(); job.setJarByClass(digitaver1.class); job.setJobName("digitaver1"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(mapper.class); job.setReducerClass(reducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); System.exit( job.waitForCompletion(true) ? 0 : 1 ); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 使用Combiner public static class mapper extends Mapper<LongWritable, Text, Text, pair>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); pair p = new pair(Integer.parseInt(ss[1]), 1); context.write(new Text(ss[0]), p); } } public static class combiner extends Reducer<Text, pair, Text, pair>{ @Override protected void reduce(Text key, Iterable<pair> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ pair p = value.iterator().next(); sum += p.getLeft().get(); cnt += p.getRight().get(); } context.write(key, new pair(sum,cnt)); } } public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<pair> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ pair p = value.iterator().next(); sum += p.getLeft().get(); cnt += p.getRight().get(); } context.write(key, new DoubleWritable((double)sum/(double)cnt)); } } main函数都一样 使用in-mapper design pattern public static class mapper extends Mapper<LongWritable, Text, Text, pair>{ private Map<String,String> map ; @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub map = new HashMap<String, String>(); } //处理完所有的输入文件再一起传给reducer或者combiner //以前map在执行过程中会一边执行一边讲输出的部分结构先传输给reducer 按照上面的话 效率会不会受影响? //虽然数据少了,但是开始的时间也推迟了??堵塞延迟小了?? //负载平衡??网络中总的数据量少了?? @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); if(!map.containsKey(ss[0])){ map.put(ss[0], ss[1]+":"+1); }else{ String tmp = map.get(ss[0]); String[] tt = tmp.split(":"); int ta = Integer.parseInt(ss[1])+Integer.parseInt(tt[0]); int tb = Integer.parseInt(tt[1])+1; map.put(ss[0], ta+":"+tb); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(Map.Entry<String, String> e : map.entrySet()){ String[] tt = e.getValue().split(":"); pair p = new pair(Integer.parseInt(tt[0]), Integer.parseInt(tt[1])); context.write(new Text(e.getKey()), p); } } } public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<pair> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ pair p = value.iterator().next(); sum += p.getLeft().get(); cnt += p.getRight().get(); } context.write(key, new DoubleWritable((double)sum/(double)cnt)); } }
in-mapper design pattern:单个mapper结果进行聚集
Combiner:所有的mapper结果进行聚集
发表评论
-
多表join的一个优化思路
2012-11-20 11:24 1470big table:streamed small table: ... -
好的网站
2012-09-20 22:17 7911. http://www.cnblogs.com/luche ... -
Hadoop 任务流程
2012-09-07 16:18 829简单的来说分为四个阶段:InputFormat/MapTask ... -
Hadoop关于最大map reducer数目
2012-08-14 20:53 987mapred-site.xml文件: <prop ... -
java.io.IOException:Typemismatch in key from map:expected org.apache.hadoop.io
2012-08-14 20:53 1459解决办法: jo ... -
HDFS 输入文件避免切分
2012-08-14 20:52 1124自定义InputFormat的子类,并把重载方法 ... -
Hadoop 开启debug信息
2012-08-14 20:51 4000运行hadoop程序时,有时候你会使用一些System. ... -
Hadoop 关于0.95/1.75 * (number of nodes)误解
2012-08-14 20:51 986reduce任务槽,即集群能够同时运行的redu ... -
MapReduce ReadingList
2012-08-09 12:22 7031. http://www.aicit.org/jcit/gl ... -
"hadoop fs 和hadoop dfs的区别"
2012-05-30 15:27 1936粗略的讲,fs是个比较抽象的层面,在分布式环境中,fs就是df ... -
Hadoop 自动清除日志
2012-05-29 18:02 949hadoop集群跑了很多的任务后 在hadoop.log ... -
DistributedCache FileNotFoundException
2012-05-26 18:02 1000此时注意两种文件路径表示形式,一个在HDFS中。一一个是本地文 ... -
Cygwin 不支持native lib 不支持使用native lib 提供的压缩
2012-05-25 13:33 1151弄了一个上午hadoop的压缩,一直报错NullPointer ... -
Hadoop 在Window下搭建 守护进程启动问题
2012-05-23 15:27 827hadoop version “0.20.2” java ... -
Cygwin ssh Connection closed by ::1
2012-05-17 21:09 1145在Win7下Cygwin中,使用sshlocalhost命令, ... -
Eclipse:Run on Hadoop 没有反应
2012-05-10 20:11 902hadoop-0.20.2下自带的eclise插件没有用,需要 ... -
Hadoop SequcenceFile 处理多个小文件
2012-04-29 11:04 3889利用sequenceFile打包多个小文件,MapFile是s ... -
Hadoop 自定义计数器
2012-04-22 09:04 1498public static class mapper e ... -
MapReduce : 新版API 自定义InputFormat 把整个文件作为一条记录处理
2012-04-10 21:47 2295自定义InputFormat 新版API 把真个文件当成 ... -
Hadoop NameNode backup
2012-03-24 18:12 861NameNode: <property> ...
相关推荐
总之,《MapReduce: Simplified Data Processing on Large Clusters》这篇论文为分布式计算领域引入了一种高效且易于使用的编程模型,极大地简化了大数据处理的复杂度,并成为了现代云计算基础设施的重要组成部分。
这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686
MapReduce 编程模型简介 MapReduce 是一种编程模型,由 Jeffrey Dean 和 Sanjay Ghemawat 于 2004 年提出,用于处理大规模数据集的分布式计算。该模型将计算任务分解成两个主要阶段:Map 和 Reduce。Map 阶段将...
### MapReduce:简化大型集群上的数据处理 #### 概述 MapReduce是一种编程模型及其相应的实现方式,旨在处理和生成大型数据集。该技术由谷歌的Jeffrey Dean和Sanjay Ghemawat提出,用于解决大规模数据处理的问题。...
### MapReduce:简化大型集群上的数据处理 #### 概述 MapReduce是一种高效的数据处理模型,主要用于处理和生成大规模数据集。它通过将数据处理任务分解为“映射(Map)”和“归并(Reduce)”两个阶段,极大地简化...
在MapReduce中,数据被分割成多个块,然后由“mapper”进行处理,将原始输入数据转换为中间键值对。 在Hadoop生态系统中,Mapper是一个用户编写的Java程序,它的主要任务是解析输入数据,执行一些预处理操作,并...
在大数据处理领域,MapReduce是一种广泛使用的分布式计算框架,由Google提出并被Hadoop采纳为标准组件。本案例主要探讨如何使用MapReduce来求取数据集的行平均值,这在数据分析、数据挖掘以及日志分析等场景中非常...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
Each pattern is explained in context, with pitfalls and caveats clearly identified to help you avoid common design mistakes when modeling your big data architecture. This book also provides a complete...
3. **编写Mapper代码**:如"morphline-hbase-mapper.xml"所示,配置Mapper以读取HBase表,使用Morphlines处理数据,并将结果发送到Solr。 4. **运行MapReduce作业**:使用这个配置启动一个MapReduce作业,该作业...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
mapreduce创建代码项目mvn原型:generate -DarchetypeGroupId = org.apache.maven.archetypes -DgroupId = org.conan.mymahout -DartifactId = myPro -DpackageName = org.conan.mymahout -Dversion = 1.0-SNAPSHOT ...
在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词计数的基本思路和具体执行过程。下面将介绍如何编写具体实现代码及如何运行程序。 首先,在本地创建 3 个文件:file00l、file002 和 ...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...
分布式文件系统经典实例——MapReduce:统计字符数 在大数据处理领域,MapReduce是一种广泛使用的编程模型,由Google提出并应用于大规模数据集的并行计算。这个实例将深入讲解如何利用MapReduce框架来统计文本中的...