`

MapReduce : Combiner的使用(以平均数为例) 并结合in-mapper design pattern 实例

阅读更多

 

没有使用Combiner 和 in-mapper desgin pattern


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;



public class digitaver1 {
	
	public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			context.write(new Text(ss[0]), new IntWritable(Integer.parseInt(ss[1])));
		}
		
	}

	public static class reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{

		@Override
		protected void reduce(Text key, Iterable<IntWritable> value,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				sum += value.iterator().next().get();
				cnt+=1;
			}
			context.write(key, new DoubleWritable((double)sum/(double)cnt));
		}
	}
	
public static void main(String[] args) {
		
		try {
			Job job = new Job();
			job.setJarByClass(digitaver1.class);
			job.setJobName("digitaver1");
			
			FileInputFormat.addInputPath(job, new Path(args[0]));
			FileOutputFormat.setOutputPath(job, new Path(args[1]));
			
			job.setMapperClass(mapper.class);
			job.setReducerClass(reducer.class);
			
			job.setInputFormatClass(TextInputFormat.class);
			job.setOutputFormatClass(TextOutputFormat.class);
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(DoubleWritable.class);
			
			System.exit( job.waitForCompletion(true) ? 0 : 1 );
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}


使用Combiner
	public static class mapper extends Mapper<LongWritable, Text, Text, pair>{

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			pair p = new pair(Integer.parseInt(ss[1]), 1);
			context.write(new Text(ss[0]), p);
		}
		
	}
	
	public static class combiner extends Reducer<Text, pair, Text, pair>{

		@Override
		protected void reduce(Text key, Iterable<pair> value,
				Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				pair p = value.iterator().next();
				sum += p.getLeft().get();
				cnt += p.getRight().get();
			}
			context.write(key, new pair(sum,cnt));
		}
		
	}
	
	public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{

		@Override
		protected void reduce(Text key, Iterable<pair> value,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				pair p = value.iterator().next();
				sum += p.getLeft().get();
				cnt += p.getRight().get();
			}
			context.write(key, new DoubleWritable((double)sum/(double)cnt));
		}
	}

main函数都一样


使用in-mapper design pattern
	public static class mapper extends Mapper<LongWritable, Text, Text, pair>{

		private Map<String,String> map ;

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			map = new HashMap<String, String>();
		}

		//处理完所有的输入文件再一起传给reducer或者combiner
		//以前map在执行过程中会一边执行一边讲输出的部分结构先传输给reducer  按照上面的话  效率会不会受影响?
		//虽然数据少了,但是开始的时间也推迟了??堵塞延迟小了??
		//负载平衡??网络中总的数据量少了??
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			if(!map.containsKey(ss[0])){
				map.put(ss[0], ss[1]+":"+1);
			}else{
				String tmp = map.get(ss[0]);
				String[] tt = tmp.split(":");
				int ta = Integer.parseInt(ss[1])+Integer.parseInt(tt[0]);
				int tb = Integer.parseInt(tt[1])+1;
				map.put(ss[0], ta+":"+tb);
			}
		}
		
		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			for(Map.Entry<String, String> e : map.entrySet()){
				String[] tt = e.getValue().split(":");
				pair p = new pair(Integer.parseInt(tt[0]), Integer.parseInt(tt[1]));
				context.write(new Text(e.getKey()), p);
			}
		}
	}
	
	public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{

		@Override
		protected void reduce(Text key, Iterable<pair> value,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				pair p = value.iterator().next();
				sum += p.getLeft().get();
				cnt += p.getRight().get();
			}
			context.write(key, new DoubleWritable((double)sum/(double)cnt));
		}
	}
in-mapper design pattern:单个mapper结果进行聚集
Combiner:所有的mapper结果进行聚集
0
0
分享到:
评论

相关推荐

    论文:MapReduce: Simplified Data Processing on Large Clusters

    总之,《MapReduce: Simplified Data Processing on Large Clusters》这篇论文为分布式计算领域引入了一种高效且易于使用的编程模型,极大地简化了大数据处理的复杂度,并成为了现代云计算基础设施的重要组成部分。

    MapReduce: Simplified Data Processing on Large Clusters 英文原文

    这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686

    MapReduce: Simplified Data Processing on Large Clusters中文版

    MapReduce 编程模型简介 MapReduce 是一种编程模型,由 Jeffrey Dean 和 Sanjay Ghemawat 于 2004 年提出,用于处理大规模数据集的分布式计算。该模型将计算任务分解成两个主要阶段:Map 和 Reduce。Map 阶段将...

    MapReduce: Simplified Data Processing on Large Clusters

    ### MapReduce:简化大型集群上的数据处理 #### 概述 MapReduce是一种编程模型及其相应的实现方式,旨在处理和生成大型数据集。该技术由谷歌的Jeffrey Dean和Sanjay Ghemawat提出,用于解决大规模数据处理的问题。...

    MapReduce: Simplified Data Processing on Large Clusters翻译

    ### MapReduce:简化大型集群上的数据处理 #### 概述 MapReduce是一种高效的数据处理模型,主要用于处理和生成大规模数据集。它通过将数据处理任务分解为“映射(Map)”和“归并(Reduce)”两个阶段,极大地简化...

    input-mapper-master_mapper_

    在MapReduce中,数据被分割成多个块,然后由“mapper”进行处理,将原始输入数据转换为中间键值对。 在Hadoop生态系统中,Mapper是一个用户编写的Java程序,它的主要任务是解析输入数据,执行一些预处理操作,并...

    MapReduce求行平均值--MapReduce案例

    在大数据处理领域,MapReduce是一种广泛使用的分布式计算框架,由Google提出并被Hadoop采纳为标准组件。本案例主要探讨如何使用MapReduce来求取数据集的行平均值,这在数据分析、数据挖掘以及日志分析等场景中非常...

    hadoop-mapreduce-client-jobclient-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...

    MapReduce Design Pattern

    Each pattern is explained in context, with pitfalls and caveats clearly identified to help you avoid common design mistakes when modeling your big data architecture. This book also provides a complete...

    morphlines.confmorphline-hbase-mapper.xml

    3. **编写Mapper代码**:如"morphline-hbase-mapper.xml"所示,配置Mapper以读取HBase表,使用Morphlines处理数据,并将结果发送到Solr。 4. **运行MapReduce作业**:使用这个配置启动一个MapReduce作业,该作业...

    hadoop-mapreduce-client-core-2.5.1-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...

    hadoop-mapreduce-client-jobclient-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...

    mapreduce:映射学习自我

    mapreduce创建代码项目mvn原型:generate -DarchetypeGroupId = org.apache.maven.archetypes -DgroupId = org.conan.mymahout -DartifactId = myPro -DpackageName = org.conan.mymahout -Dversion = 1.0-SNAPSHOT ...

    MapReduce编程实例:单词计数

    在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词计数的基本思路和具体执行过程。下面将介绍如何编写具体实现代码及如何运行程序。 首先,在本地创建 3 个文件:file00l、file002 和 ...

    hadoop-mapreduce-client-app-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...

    hadoop-mapreduce-client-app-2.7.3-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...

    hadoop-mapreduce-client-app-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...

    hadoop-mapreduce-client-core-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...

    分布式文件系统经典实例-mapreduce-统计字符数

    分布式文件系统经典实例——MapReduce:统计字符数 在大数据处理领域,MapReduce是一种广泛使用的编程模型,由Google提出并应用于大规模数据集的并行计算。这个实例将深入讲解如何利用MapReduce框架来统计文本中的...

Global site tag (gtag.js) - Google Analytics