`

MapReduce设计模式:Chaining

 
阅读更多

MapReduce设计模式:Chaining

 

Chaining这种设计模式非常重要,主要是因为你通常无法通过单个MapReduce Job来完成工作;某些Job必须串行,因为前者Job的输出会成为下个Job的输入;某些Job可以并行,因为Job运行之间没有关系;有些JobMapper是对日志的重复处理,需要将代码规范化。不管怎么说,不能靠单个MapReduce程序就完成工作是我们的挑战。

 

我们这节来讨论下Chaining模式,由于Hadoop的设计是为了MapReduce程序的高速运行,单个Job完全没有问题,但是这个并不能解决工作中的问题。在 工作中,通常任务根本无法通过单个Job完成,除非是像WordCount这样的任务。也就是说对于多阶段的MapReduce程序控制上需要我们做更多的编码,对Job进行控制。如果你感觉任务过于复杂,控制难于从心时,看下这个Oozie这个Apache Project,这个Oozie我也没有细看,能够通过配置对Job进行更为细致的控制。我们本节的任务不是讨论MapReduce框架处理Job的复杂性,不过对于Job执行的优劣必须知道。

 

下面我们讨论Chaining设计模式的实现。

首先,回忆下我们讨论过的TOP-K问题,会怎么实现呢?

 

首先我们先对输出的数据进行排序,算出总排序,这个需要一个Job;然后再对这个Job的输出作为下一个Job的输入,计算TOP-K数据,代码不再单独列出,思想理解就行。

这个计算方法就利用了一种很重要的Chaining方法,Serial Chaining。这个是指将MapReduce Job逐个串起来,按照顺序执行。用Unix-Style方式表示就是:

mapreduce-1 | mapreduce-2 | mapreduce-3 | ...

 这个是最直观的Mapreduce Job的编写方式。通过这种控制,我们能完成工作,但是问题在于:这样写出来的代码如果Job比较多的话,控制各个mapreduce的执行状态会让代码特别丑;如果只有少数几个的话,还是可以接受的,如果Job串行过多,这种方法要慎用。

其实HadoopMapreduce支持依赖方式的编码。如果有这么个场景:mapreducea处理来自数据库的数据集合,mapreduceb处理来自Hbase的数据集合,mapreducec则对这两个集合进行Join操作,Join操作的内容参考http://isilic.iteye.com/blog/1772945。这样这三个mapreduce就有了依赖关系。mapreduceamapreduceb两者可以并行执行,mapreducec则依赖于前两者的执行结果。这个用上面提到的Serial Chaining也不好解决。

实际上,Hadoop实现的mapreduce程序已经帮助我们完成相关的控制;在Job中,可以设置依赖的Job,直接提交最终的JobMapReduce会控制依赖关系的执行:

mapreducec.addDependingJob(mapreducea);
mapreducec.addDependingJob(mapreduceb);
submit(mapreducea,mapreduceb,mapreducec);

 

注意这个并不是真正的mapreduce依赖,是Job的依赖,这里只是点明三者的关系。

使用这种方法,需要对Job的执行状态进行监控,确保三者都执行完毕后,主程序再退出。

 

上面两种方法都是对于Job依赖的情况,前者是自己控制,后者定义好依赖关系,提交给系统控制。不过现在又这么一种情况,在Document Information Retrieval中,需要对文档的Stop word进行处理,还需要对Stemming进行继续的处理,后续再进行lemmatization处理,处理完成后作为Term进行后续的正常处理,如词频统计,分词等。对于Document的一系列操作难道要写多个Mapreduce吗?不用,Hadoop提供了ChainMapperChainReducer来处理这种情况。我们来详细学习下ChainMapperChainReducer这两个方法。

 

ChainMapper:在单个map节点执行多个mapper逻辑,这个类似于Unix的管道操作,管道处理记录;其中前者的处理输出作为下一个的mapper的输入,mapper chain执行完毕后,才会进入到partitioner阶段,后面会继续进入到Reducer阶段。

 

ChainReducer:难道你会认为ChainReducer的功能和ChainMapper类似吗?No,完全不是。ChainReducer中可以设置单个Reducer和多个Mapper,执行完Reducer之后会再顺序执行Mapper的逻辑(吐槽下这个ChainReducer的命名),跟Reducer Chain完全没有关系。

 

按照官网的说法,ChainMapper的优点是更好的利用IO,在大多数情况下,单个Mapper也能完成ChainMapper的内容,不过这样的设计更好的是软件工程的功能化、内聚化的体现。ChainReducer能偶更好地利用reducer节点,方便实现reducer输出后的进一步数据逻辑处理,实际上就是ChainReducer中的Mapper处理逻辑。

本文开始的mapreduce程序可以逻辑表述为:

 

[ map | reduce]+

 

ChainMapperChainReducer用逻辑关系式可以表示为:

[ map* | reducer mapper* ]

 这个逻辑表达式一定要理解数据流的走向,最好能结合代码看下。

 

将上面的内容结合起来,就可以得到本节的核心内容,表示为:

[ map* | reducer mapper* ]+

这个关系式如果能理解,再结合自己工作中的逻辑,就算是深入的理解本节的内容了。

 

最后贴个wordcount的代码,根据本节的内容做了个更改,逻辑性方面就不要再追究了,仅供演示使用。

代码不做解释,大家就是需要理解一下这个过程:

 

public class WordCount {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map ( Object key, Text value, Context context ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
        public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException {
            String k = key.toString();
            context.write(new Text(k.toUpperCase()), value);
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce ( Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static class ThresholdFilterMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
        public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException {
            int cnt = value.get();
            if (cnt > 10)
                context.write(key, value);
        }
    }

    public static void main ( String[] args ) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        JobConf jc = new JobConf("JobControl");
        Job job = new Job(jc);
        Configuration tokenConf = new Configuration(false);
        ChainMapper.addMapper(job, TokenizerMapper.class, Object.class, Text.class, Text.class, IntWritable.class, tokenConf);
        Configuration upperConf = new Configuration(false);
        ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperConf);
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        Configuration reducerConf = new Configuration(false);
        ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, reducerConf);
        Configuration thresholdConf = new Configuration(false);
        ChainReducer.addMapper(job, ThresholdFilterMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, thresholdConf);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

 

分享到:
评论

相关推荐

    MapReduce编程实例:单词计数

    本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含...

    MapReduce 设计模式

    MapReduce 设计模式,深入理解MapReduce编程模式,更好的利用MapReduce模型

    MapReduce设计模式介绍.ppt

    MapReduce 设计模式知识点总结 MapReduce 设计模式是大数据处理的核心组件,负责将大规模数据处理和分析任务分解为可并行处理的任务。MapReduce 设计模式主要由两个阶段组成:Map 阶段和 Reduce 阶段。 Map 阶段 ...

    MapReduce设计模式.pdf

    MapReduce设计模式.pdf

    MapReduce设计模式

    , 由于本书不会过多涉及底层框架及MapReduce API,所以希望读者阅读《MapReduce设计模式》之前,能够对Hadoop系统有所了解,知道如何编写MapReduce程序,并了解MapReduce程序框架的工作原理。《MapReduce设计模式》...

    MapReduce设计模式高清完整.pdf版

    MapReduce设计模式.pdf 个人收集电子书,仅用学习使用,不可用于商业用途,如有版权问题,请联系删除!

    MapReduce设计模式 英文版

    &lt;MapReduce设计模式&gt;英文版,概述性的介绍了MapReduce的常见设计模式和应用场景.详细的源码可以帮助理解.

    [MapReduce] MapReduce 设计模式 (英文版)

    [奥莱理] MapReduce 设计模式 (英文版) [奥莱理] MapReduce Design Patterns Building Effective Algorithms and Analytics for Hadoop and Other Systems (E-Book) ☆ 出版信息:☆ [作者信息] Donald Miner, ...

    MapReduce实例分析:单词计数

    单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数...其次,确定 MapReduce 程序的设计思路。把文件内容分

    MapReduce设计模式 [(美)迈纳,(美)舒克著][人民邮电出版社][2014.09][213页]

    MapReduce设计模式 ,值得一看

    Hadoop MapReduce 设计模式

    This book will be unique in some ways and familiar in others. First and foremost, this book is obviously about design patterns, which are templates or general guides to solving problems....

    mapreduce 设计模式

    书中主要介绍编程模式,即如何利用MapReduce框架解决一类问题,重在提供解决问题的方法和思路。作者花大量篇幅介绍各种模式的原理及实现机制,并给出相应的应用实例,让读者对每种模式能有更直观的理解。

    MapReduce: Simplified Data Processing on Large Clusters 英文原文

    这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686

    hadoop mapreduce 设计模式

    hadoop mapreduce 设计模式,mapreduce 程序编写,英文数据,但是代码结构清晰,容易看懂,适合实战

    GFS、MapReduce和BigTable:Google的三种大数据处理系统.docx

    【MapReduce:并行计算模型】 MapReduce是Google为大数据处理设计的一种编程模型,主要用于处理和生成大规模数据集。它将复杂任务分解为两个阶段——Map和Reduce,Map阶段将数据集切分为可处理的片段,由多个工作...

    GFS、MapReduce和BigTable:Google的三种大数据处理系统.pdf

    MapReduce的设计使得开发者能够轻松处理PB级别的数据,而无需关注底层的分布式细节。 BigTable是一个大规模分布式数据库,适合存储非结构化和半结构化的数据。它使用列族模型,允许快速读取和写入大量数据。...

    MapReduce: Simplified Data Processing on Large Clusters中文版

    MapReduce 模型的编程模式包括两个主要函数:Map 和 Reduce。Map 函数处理输入的键值对,并产生一组中间的键值对。Reduce 函数处理中间键值对,并合并这些值,形成一个相对较小的值集合。该模型的实现可以让程序员不...

    深入理解MapReduce架构设计与实现原理 高清 完整书签

    《深入理解MapReduce架构设计与实现原理》是一本专注于大数据处理技术MapReduce的专业书籍,由阿里专家撰写。MapReduce是Google提出的一种分布式计算模型,它为海量数据的处理提供了强大的计算能力,尤其在大规模...

    第一个Mapreduce程序.pdf

    本文介绍了用Java编写并运行第一个mapreduce作业的步骤及遇到的问题和解决方案。

Global site tag (gtag.js) - Google Analytics