MapReduce设计模式:Chaining
Chaining这种设计模式非常重要,主要是因为你通常无法通过单个MapReduce Job来完成工作;某些Job必须串行,因为前者Job的输出会成为下个Job的输入;某些Job可以并行,因为Job运行之间没有关系;有些Job的Mapper是对日志的重复处理,需要将代码规范化。不管怎么说,不能靠单个MapReduce程序就完成工作是我们的挑战。
我们这节来讨论下Chaining模式,由于Hadoop的设计是为了MapReduce程序的高速运行,单个Job完全没有问题,但是这个并不能解决工作中的问题。在 工作中,通常任务根本无法通过单个Job完成,除非是像WordCount这样的任务。也就是说对于多阶段的MapReduce程序控制上需要我们做更多的编码,对Job进行控制。如果你感觉任务过于复杂,控制难于从心时,看下这个Oozie这个Apache Project,这个Oozie我也没有细看,能够通过配置对Job进行更为细致的控制。我们本节的任务不是讨论MapReduce框架处理Job的复杂性,不过对于Job执行的优劣必须知道。
下面我们讨论Chaining设计模式的实现。
首先,回忆下我们讨论过的TOP-K问题,会怎么实现呢?
首先我们先对输出的数据进行排序,算出总排序,这个需要一个Job;然后再对这个Job的输出作为下一个Job的输入,计算TOP-K数据,代码不再单独列出,思想理解就行。
这个计算方法就利用了一种很重要的Chaining方法,Serial Chaining。这个是指将MapReduce Job逐个串起来,按照顺序执行。用Unix-Style方式表示就是:
mapreduce-1 | mapreduce-2 | mapreduce-3 | ...
这个是最直观的Mapreduce Job的编写方式。通过这种控制,我们能完成工作,但是问题在于:这样写出来的代码如果Job比较多的话,控制各个mapreduce的执行状态会让代码特别丑;如果只有少数几个的话,还是可以接受的,如果Job串行过多,这种方法要慎用。
其实Hadoop的Mapreduce支持依赖方式的编码。如果有这么个场景:mapreducea处理来自数据库的数据集合,mapreduceb处理来自Hbase的数据集合,mapreducec则对这两个集合进行Join操作,Join操作的内容参考http://isilic.iteye.com/blog/1772945。这样这三个mapreduce就有了依赖关系。mapreducea和mapreduceb两者可以并行执行,mapreducec则依赖于前两者的执行结果。这个用上面提到的Serial Chaining也不好解决。
实际上,Hadoop实现的mapreduce程序已经帮助我们完成相关的控制;在Job中,可以设置依赖的Job,直接提交最终的Job,MapReduce会控制依赖关系的执行:
mapreducec.addDependingJob(mapreducea); mapreducec.addDependingJob(mapreduceb); submit(mapreducea,mapreduceb,mapreducec);
注意这个并不是真正的mapreduce依赖,是Job的依赖,这里只是点明三者的关系。
使用这种方法,需要对Job的执行状态进行监控,确保三者都执行完毕后,主程序再退出。
上面两种方法都是对于Job依赖的情况,前者是自己控制,后者定义好依赖关系,提交给系统控制。不过现在又这么一种情况,在Document Information Retrieval中,需要对文档的Stop word进行处理,还需要对Stemming进行继续的处理,后续再进行lemmatization处理,处理完成后作为Term进行后续的正常处理,如词频统计,分词等。对于Document的一系列操作难道要写多个Mapreduce吗?不用,Hadoop提供了ChainMapper和ChainReducer来处理这种情况。我们来详细学习下ChainMapper和ChainReducer这两个方法。
ChainMapper:在单个map节点执行多个mapper逻辑,这个类似于Unix的管道操作,管道处理记录;其中前者的处理输出作为下一个的mapper的输入,mapper chain执行完毕后,才会进入到partitioner阶段,后面会继续进入到Reducer阶段。
ChainReducer:难道你会认为ChainReducer的功能和ChainMapper类似吗?No,完全不是。ChainReducer中可以设置单个Reducer和多个Mapper,执行完Reducer之后会再顺序执行Mapper的逻辑(吐槽下这个ChainReducer的命名),跟Reducer Chain完全没有关系。
按照官网的说法,ChainMapper的优点是更好的利用IO,在大多数情况下,单个Mapper也能完成ChainMapper的内容,不过这样的设计更好的是软件工程的功能化、内聚化的体现。ChainReducer能偶更好地利用reducer节点,方便实现reducer输出后的进一步数据逻辑处理,实际上就是ChainReducer中的Mapper处理逻辑。
本文开始的mapreduce程序可以逻辑表述为:
[ map | reduce]+
ChainMapper和ChainReducer用逻辑关系式可以表示为:
[ map* | reducer mapper* ]
这个逻辑表达式一定要理解数据流的走向,最好能结合代码看下。
将上面的内容结合起来,就可以得到本节的核心内容,表示为:
[ map* | reducer mapper* ]+
这个关系式如果能理解,再结合自己工作中的逻辑,就算是深入的理解本节的内容了。
最后贴个wordcount的代码,根据本节的内容做了个更改,逻辑性方面就不要再追究了,仅供演示使用。
代码不做解释,大家就是需要理解一下这个过程:
public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map ( Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> { public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException { String k = key.toString(); context.write(new Text(k.toUpperCase()), value); } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce ( Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class ThresholdFilterMapper extends Mapper<Text, IntWritable, Text, IntWritable> { public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException { int cnt = value.get(); if (cnt > 10) context.write(key, value); } } public static void main ( String[] args ) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } JobConf jc = new JobConf("JobControl"); Job job = new Job(jc); Configuration tokenConf = new Configuration(false); ChainMapper.addMapper(job, TokenizerMapper.class, Object.class, Text.class, Text.class, IntWritable.class, tokenConf); Configuration upperConf = new Configuration(false); ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperConf); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); Configuration reducerConf = new Configuration(false); ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, reducerConf); Configuration thresholdConf = new Configuration(false); ChainReducer.addMapper(job, ThresholdFilterMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, thresholdConf); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关推荐
本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含...
MapReduce 设计模式知识点总结 MapReduce 设计模式是大数据处理的核心组件,负责将大规模数据处理和分析任务分解为可并行处理的任务。MapReduce 设计模式主要由两个阶段组成:Map 阶段和 Reduce 阶段。 Map 阶段 ...
MapReduce设计模式.pdf
, 由于本书不会过多涉及底层框架及MapReduce API,所以希望读者阅读《MapReduce设计模式》之前,能够对Hadoop系统有所了解,知道如何编写MapReduce程序,并了解MapReduce程序框架的工作原理。《MapReduce设计模式》...
MapReduce设计模式.pdf 个人收集电子书,仅用学习使用,不可用于商业用途,如有版权问题,请联系删除!
[奥莱理] MapReduce 设计模式 (英文版) [奥莱理] MapReduce Design Patterns Building Effective Algorithms and Analytics for Hadoop and Other Systems (E-Book) ☆ 出版信息:☆ [作者信息] Donald Miner, ...
单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数...其次,确定 MapReduce 程序的设计思路。把文件内容分
MapReduce设计模式 ,值得一看
书中主要介绍编程模式,即如何利用MapReduce框架解决一类问题,重在提供解决问题的方法和思路。作者花大量篇幅介绍各种模式的原理及实现机制,并给出相应的应用实例,让读者对每种模式能有更直观的理解。
这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686
HDFS是一个分布式的文件系统,设计用于存储大量数据,并且能够提供高吞吐量的数据访问。 在上传文件到HDFS时,文中遇到了一个错误提示“put: `words.txt': No such file or directory”。这可能意味着文件名、路径...
【MapReduce:并行计算模型】 MapReduce是Google为大数据处理设计的一种编程模型,主要用于处理和生成大规模数据集。它将复杂任务分解为两个阶段——Map和Reduce,Map阶段将数据集切分为可处理的片段,由多个工作...
MapReduce的设计使得开发者能够轻松处理PB级别的数据,而无需关注底层的分布式细节。 BigTable是一个大规模分布式数据库,适合存储非结构化和半结构化的数据。它使用列族模型,允许快速读取和写入大量数据。...