- 浏览: 563126 次
- 性别:
- 来自: 济南
文章分类
- 全部博客 (270)
- Ask chenwq (10)
- JSF (2)
- ExtJS (5)
- Life (19)
- jQuery (5)
- ASP (7)
- JavaScript (5)
- SQL Server (1)
- MySQL (4)
- En (1)
- development tools (14)
- Data mining related (35)
- Hadoop (33)
- Oracle (13)
- To Do (2)
- SSO (2)
- work/study diary (10)
- SOA (6)
- Ubuntu (7)
- J2SE (18)
- NetWorks (1)
- Struts2 (2)
- algorithm (9)
- funny (1)
- BMP (1)
- Paper Reading (2)
- MapReduce (23)
- Weka (3)
- web design (1)
- Data visualisation&R (1)
- Mahout (7)
- Social Recommendation (1)
- statistical methods (1)
- Git&GitHub (1)
- Python (1)
- Linux (1)
最新评论
-
brandNewUser:
楼主你好,问个问题,为什么我写的如下的:JobConf pha ...
Hadoop ChainMap -
Molisa:
Molisa 写道mapred.min.split.size指 ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
Molisa:
mapred.min.split.size指的是block数, ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
heyongcs:
请问导入之后,那些错误怎么解决?
Eclipse导入Mahout -
a420144030:
看了你的文章深受启发,想请教你几个问题我的数据都放到hbase ...
Mahout clustering Canopy+K-means 源码分析
1、Map/Reduce编程模型型原理
利用一个输入key/value pair 集合来产生一个输出的key/value pair
集合。MapReduce
库的用户用两个函数表达这个计算:Map 和Reduce。
Hadoop Map/Reduce实现主要是通过继承Mapper和Reducer两个抽象类,并实现map和reduce两个方法实现的。
Mapper
Mapper 将输入键值对(key/value pair)映射到一组中间格式的键值对集合。
Map是一类将输入记录集转换为中间格式记录集的独立任务。
这种转换的中间格式记录集不需要与输入记录集的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。
输出键值对不需要与输入键值对的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。
Mapper 的输出被排序后,就被划分给每个Reducer 。分块的总数目和一个作业的reduce任务的数目是一样的。用户可以 通过实现自定义的
Partitioner 来控制哪个key被分配给哪个 Reducer 。
用户可选择通过 JobConf.setCombinerClass(Class) 指定一个combiner
,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper 到 Reducer 数据传输量。
这些被排好序的中间过程的输出结果保存的格式是(key-len, key, value-len, value),应用程序可以通过JobConf
控制对这些中间结果是否进行压缩以及怎么压缩,使用哪种 CompressionCodec 。
需要多少个Map?
Map的数目通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。
Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU
消耗较小的map任务可以设到300个左右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟。
这样,如果你输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,除非使用
setNumMapTasks(int) (注意:这里仅仅是对框架进行了一个提示(hint),实际决定因素见这里 )将这个数值设置得更高。
Reducer
Reducer
将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。
用户可以通过 JobConf.setNumReduceTasks(int) 设定一个作业中reduce任务的数目。
概括地说,对Reducer 的实现者需要重写 JobConfigurable.configure(JobConf)
方法,这个方法需要传递一个 JobConf 参数,目的是完成Reducer的初始化工作。然后,框架为成组的输入数据中的每个<key, (list of
values)> 对调用一次 reduce(WritableComparable, Iterator, OutputCollector,
Reporter) 方法。
之后,应用程序可以通过重写Closeable.close() 来执行相应的清理工作。
Reducer 有3个主要阶段:shuffle、sort和reduce。
Shuffle
Reducer 的输入就是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。
Sort
这个阶段,框架将按照key的值对Reducer 的输入进行分组 (因为不同mapper的输出中可能会有相同的key)。
Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。
需要多少个Reduce?
Reduce的数目建议是0.95 或1.75 乘以 (<no. of nodes > * mapred.tasktracker.reduce.tasks.maximum )。
Reduce的数目建议是0.95 或1.75 乘以 (<no. of nodes > * mapred.tasktracker.reduce.tasks.maximum )。
用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。
增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。
上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks) 或失败的任务预留一些reduce的资源。
无Reducer
如果没有归约要进行,那么设置reduce任务的数目为零 是合法的。
这种情况下,map任务的输出会直接被写入由 setOutputPath(Path) 指定的输出路径。框架在把它们写入FileSystem
之前没有对它们进行排序。
示例
知道了Map/Reduce相关基础知识,回到我们的主题“MapReduce高级编程之本地聚集与Combinner”。下面以数字求和为例:对一个包含有海量数字的文本文件进行统计,并求出所有数字的和。
例子:对包含有1*10^6(100000)个数字文件,进行分析并求和。
文件格式:
-50 43 20 58 40 64 -95 28 61 55
38 78 -28 96 35 2 3 4 -87 22
-22 63 40 93 -58 81 72 63 93 94
-48 77 40 42 35 86 -66 43 26 70
-21 45 -14 6 21 73 96 31 -90 57
38 78 -28 96 35 2 3 4 -87 22
-22 63 40 93 -58 81 72 63 93 94
-48 77 40 42 35 86 -66 43 26 70
-21 45 -14 6 21 73 96 31 -90 57
解决思路:
第一种方法是用Mapper读取文本文件用StringTokenizer对读取文件内的每一行的数字(Hadoop处理文本文件时,处理时是一行一行记取的)进行分隔,获取每一个数字,然后求和,再将求得的值按Key/Value格式写入Context,最后用Reducer对求得中间值进行汇总求和,得出整个文件所有数字的和。
第二种方法是用Mapper读取文本文件用StringTokenizer对文件内的数字进行分隔,获取每一个数字,并救出文件中该数字有多少个,在合并过程中,求出每个数字在文件中的和,最后用Reducer对求得每个数字求得的和进行汇总求和,得出整个文件所有数字的和。
在给出具体代码之前我们还对本地聚集Combinner的编程规则做说明:
在MapReduce Shuffle中存在着三次排序(Map端两次,reduce端一次),每次排序时便会用上一次Combinner函数, 也就是这个可有可无的函数一旦定义,总共会被三次调用
Combinner的使用需要注意程序的差错性,必须满足使用Combinner时的两个必备条件:
- 由于其执行由MapReduce执行框架负责,因此算法如要保证正确性,必须保证算法结果与combiner无关(即combiner是否执行,执行后产生什么中间结果对算法的最终结果无影响)
- MapReduce中,mapper的输入数据就是reducer的输入数据,因此这两种数据的格式是一致的。conbiner必须服从于这个前提(即combiner的输入数据、输出数据必须为mapper的输出格式,即reducer的输入格式)。
具体代码
1. 第一种思路实现
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class NumberSum { //对每一行数据进行分隔,并求和 public static class SumMapper extends Mapper<Object, Text, Text, LongWritable> { private Text word = new Text("sum"); private static LongWritable numValue = new LongWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); long sum = 0; while (itr.hasMoreTokens()) { String s = itr.nextToken(); long val = Long.parseLong(s); sum += val; } numValue.set(sum); context.write(word, numValue); } } // 汇总求和,输出 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { private LongWritable result = new LongWritable(); private Text k = new Text("sum"); public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { long v = val.get(); sum += v; } result.set(sum); context.write(k, result); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: numbersum <in> <out>"); System.exit(2); } Job job = new Job(conf, "number sum"); job.setJarByClass(NumberSum.class); job.setMapperClass(SumMapper.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); System.out.println("ok"); } }
2. 第二种思路实现
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class NumberSum { //对每一个数字进行分隔 public static class NumSumMapper extends Mapper<Object, Text, Text, LongWritable> { private Text word = new Text(); private static LongWritable numValue = new LongWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); long sum = 0; while (itr.hasMoreTokens()) { String s = itr.nextToken(); word.set(s); context.write(word, numValue); } } } //对每一个数字进行汇总求和 public static class SumCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { private LongWritable result = new LongWritable(); private Text k = new Text("midsum"); public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; if (!key.toString().startsWith("midsum")) { for (LongWritable val : values) { sum += val.get(); } long kval = Long.parseLong(key.toString()); long v = kval * sum; result.set(v); context.write(k, result); } else { for (LongWritable val : values) { context.write(key, val); } } } } // 汇总求和,输出 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { private LongWritable result = new LongWritable(); private Text k = new Text("sum"); public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { long v = val.get(); sum += v; } result.set(sum); context.write(k, result); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: numbersum <in> <out>"); System.exit(2); } Job job = new Job(conf, "number sum"); job.setJarByClass(NumberSum.class); job.setMapperClass(NumSumMapper.class); job.setCombinerClass(SumCombiner.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); System.out.println("ok"); } }
参考:
[1]Hadoop Map/Reduce编程模型实现海量数据处理—数字求和 魏仁言
[2]MapReduce高级编程之本地聚集与Combinner Fth-Hokage
- HadoopinAction_source_code.zip (24.8 KB)
- 下载次数: 9
- Hadoop_in_Action.rar (3.8 MB)
- 下载次数: 10
发表评论
-
Parallel K-Means Clustering Based on MapReduce
2012-08-04 20:28 1403K-means is a pleasingly paral ... -
Pagerank在Hadoop上的实现原理
2012-07-19 16:04 1463转自:pagerank 在 hadoop 上的实现原理 ... -
Including external jars in a Hadoop job
2012-06-25 20:24 1219办法1: 把所有的第三方jar和自己的class打成一个大 ... -
[转]BSP模型与实例分析(一)
2012-06-15 22:26 0一、BSP模型概念 BSP(Bulk Synchr ... -
Hadoop中两表JOIN的处理方法
2012-05-29 10:35 9631. 概述 在传统数据库(如:MYSQL)中,JOIN ... -
Hadoop DistributedCache
2012-05-27 23:45 1126Hadoop的DistributedCache,可以把 ... -
MapReduce,组合式,迭代式,链式
2012-05-27 23:27 23881.迭代式mapreduce 一些复杂的任务难以用一 ... -
Hadoop ChainMap
2012-05-27 23:09 1986单一MapReduce对一些非常简单的问题提供了很好的支持。 ... -
广度优先BFS的MapReduce实现
2012-05-25 21:47 4312社交网络中的图模型经常需要构造一棵树型结构:从一个特定的节点出 ... -
HADOOP程序日志
2012-05-23 19:53 1015*.log日志文件和*.out日志文件 进入Hadoo ... -
TFIDF based on MapReduce
2012-05-23 11:58 951Job1: Map: input: ( ... -
个人Hadoop 错误列表
2012-05-23 11:31 1491错误1:Too many fetch-failure ... -
Hadoop Map&Reduce个数优化设置以及JVM重用
2012-05-22 11:29 2430Hadoop与JVM重用对应的参数是map ... -
有空读下
2012-05-20 23:59 0MapReduce: JT默认task scheduli ... -
Hadoop MapReduce Job性能调优——修改Map和Reduce个数
2012-05-20 23:46 26754map task的数量即mapred ... -
Hadoop用于和Map Reduce作业交互的命令
2012-05-20 16:02 1225用法:hadoop job [GENERIC_OPTION ... -
Eclipse:Run on Hadoop 没有反应
2012-05-20 11:46 1277原因: hadoop-0.20.2下自带的eclise ... -
Hadoop0.20+ custom MultipleOutputFormat
2012-05-20 11:46 1540Hadoop0.20.2中无法使用MultipleOutput ... -
Custom KeyValueTextInputFormat
2012-05-19 16:23 1716在看老版的API时,发现旧的KeyValueTextInpu ... -
Hadoop SequenceFile Writer And Reader
2012-05-19 15:22 2067package cn.edu.xmu.dm.mpdemo ...
相关推荐
实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...
### 大数据实验四-MapReduce编程实践 #### 一、实验内容与目的 ##### 实验内容概述 本次实验的主要内容是使用MapReduce框架来实现WordCount词频统计功能,即统计HDFS(Hadoop Distributed File System)系统中多个...
大数据实验 实验五:MapReduce 初级编程实践
【MapReduce初级编程实践】是大数据处理中的一项基础任务,主要应用于大规模数据集的并行计算。在这个实验中,我们关注的是如何利用MapReduce来实现文件的合并与去重操作。MapReduce是一种分布式计算模型,由Google...
一个自己写的Hadoop MapReduce实例源码,网上看到不少网友在学习MapReduce编程,但是除了wordcount范例外实例比较少,故上传自己的一个。包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作...
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce 2.0的框架结构和源码分析,MapReduce 2.0的配置与测试,MapReduce 2.0运行流程,MapReduce 2.0高级程序设计以及相关特性等内容。...
**大数据技术原理及应用——MapReduce初级编程实践** MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。在这个实验中,我们将学习如何利用MapReduce编程解决实际问题,包括数据去重...
Hadoop MapReduce 是大数据处理的核心组件之一,它提供了一个编程模型和软件框架,用于大规模数据处理。下面是 Hadoop MapReduce 编程实战的知识点总结: MapReduce 编程基础 MapReduce 是一个编程模型,用于处理...
在这个实验报告中,我们将探讨如何在Eclipse环境中设置和使用MapReduce编程。 首先,为了在Eclipse上编写和运行MapReduce程序,我们需要安装`hadoop-eclipse-plugin`。这个插件允许开发者直接在IDE中开发、调试和...
(实践三)MapReduce 布隆过滤器 过滤器训练、过滤器应用、结果验证及分析 (实践四)MapReduce Top 10模式示例 在ctrip数据集上进行Top 10排序。 (实践五)去重的用户—针对ctrip数据集去重 对ctrip数据集中的...
07丨为什么说MapReduce既是编程模型又是计算框架?.html
### MapReduce并行编程模型研究 #### 摘要与背景 MapReduce作为一种高效的数据处理框架,被广泛应用于大规模数据集的处理上。它通过提供简单而强大的编程接口,简化了分布式并行编程的复杂性,使开发人员能够专注...
### MapReduce 编程模型详解 #### 一、引言:MapReduce——大规模数据处理的革新者 在当今数字化时代,大数据的处理已成为各行业关注的焦点。随着互联网的飞速发展,数据量呈指数级增长,传统的数据处理方法已无法...
### MapReduce编程模型详解 #### 一、MapReduce概述与问题背景 MapReduce是一种由Google提出的编程模型,用于处理大规模数据集(通常是TB甚至PB级别的数据)。它通过将大规模的数据处理任务分解为可以在大量普通...
一些MapReduce的入门程序 来自《hadoop权威指南》《hadoop in action》
MapReduce编程实例浅析,讲述如何进行M/R程序开发。
08.mapreduce编程案例--流量统计求和--自定义数据类型.mp4