`

MapReduce设计模式:Chaining

 
阅读更多

MapReduce设计模式:Chaining

 

Chaining这种设计模式非常重要,主要是因为你通常无法通过单个MapReduce Job来完成工作;某些Job必须串行,因为前者Job的输出会成为下个Job的输入;某些Job可以并行,因为Job运行之间没有关系;有些JobMapper是对日志的重复处理,需要将代码规范化。不管怎么说,不能靠单个MapReduce程序就完成工作是我们的挑战。

 

我们这节来讨论下Chaining模式,由于Hadoop的设计是为了MapReduce程序的高速运行,单个Job完全没有问题,但是这个并不能解决工作中的问题。在 工作中,通常任务根本无法通过单个Job完成,除非是像WordCount这样的任务。也就是说对于多阶段的MapReduce程序控制上需要我们做更多的编码,对Job进行控制。如果你感觉任务过于复杂,控制难于从心时,看下这个Oozie这个Apache Project,这个Oozie我也没有细看,能够通过配置对Job进行更为细致的控制。我们本节的任务不是讨论MapReduce框架处理Job的复杂性,不过对于Job执行的优劣必须知道。

 

下面我们讨论Chaining设计模式的实现。

首先,回忆下我们讨论过的TOP-K问题,会怎么实现呢?

 

首先我们先对输出的数据进行排序,算出总排序,这个需要一个Job;然后再对这个Job的输出作为下一个Job的输入,计算TOP-K数据,代码不再单独列出,思想理解就行。

这个计算方法就利用了一种很重要的Chaining方法,Serial Chaining。这个是指将MapReduce Job逐个串起来,按照顺序执行。用Unix-Style方式表示就是:

mapreduce-1 | mapreduce-2 | mapreduce-3 | ...

 这个是最直观的Mapreduce Job的编写方式。通过这种控制,我们能完成工作,但是问题在于:这样写出来的代码如果Job比较多的话,控制各个mapreduce的执行状态会让代码特别丑;如果只有少数几个的话,还是可以接受的,如果Job串行过多,这种方法要慎用。

其实HadoopMapreduce支持依赖方式的编码。如果有这么个场景:mapreducea处理来自数据库的数据集合,mapreduceb处理来自Hbase的数据集合,mapreducec则对这两个集合进行Join操作,Join操作的内容参考http://isilic.iteye.com/blog/1772945。这样这三个mapreduce就有了依赖关系。mapreduceamapreduceb两者可以并行执行,mapreducec则依赖于前两者的执行结果。这个用上面提到的Serial Chaining也不好解决。

实际上,Hadoop实现的mapreduce程序已经帮助我们完成相关的控制;在Job中,可以设置依赖的Job,直接提交最终的JobMapReduce会控制依赖关系的执行:

mapreducec.addDependingJob(mapreducea);
mapreducec.addDependingJob(mapreduceb);
submit(mapreducea,mapreduceb,mapreducec);

 

注意这个并不是真正的mapreduce依赖,是Job的依赖,这里只是点明三者的关系。

使用这种方法,需要对Job的执行状态进行监控,确保三者都执行完毕后,主程序再退出。

 

上面两种方法都是对于Job依赖的情况,前者是自己控制,后者定义好依赖关系,提交给系统控制。不过现在又这么一种情况,在Document Information Retrieval中,需要对文档的Stop word进行处理,还需要对Stemming进行继续的处理,后续再进行lemmatization处理,处理完成后作为Term进行后续的正常处理,如词频统计,分词等。对于Document的一系列操作难道要写多个Mapreduce吗?不用,Hadoop提供了ChainMapperChainReducer来处理这种情况。我们来详细学习下ChainMapperChainReducer这两个方法。

 

ChainMapper:在单个map节点执行多个mapper逻辑,这个类似于Unix的管道操作,管道处理记录;其中前者的处理输出作为下一个的mapper的输入,mapper chain执行完毕后,才会进入到partitioner阶段,后面会继续进入到Reducer阶段。

 

ChainReducer:难道你会认为ChainReducer的功能和ChainMapper类似吗?No,完全不是。ChainReducer中可以设置单个Reducer和多个Mapper,执行完Reducer之后会再顺序执行Mapper的逻辑(吐槽下这个ChainReducer的命名),跟Reducer Chain完全没有关系。

 

按照官网的说法,ChainMapper的优点是更好的利用IO,在大多数情况下,单个Mapper也能完成ChainMapper的内容,不过这样的设计更好的是软件工程的功能化、内聚化的体现。ChainReducer能偶更好地利用reducer节点,方便实现reducer输出后的进一步数据逻辑处理,实际上就是ChainReducer中的Mapper处理逻辑。

本文开始的mapreduce程序可以逻辑表述为:

 

[ map | reduce]+

 

ChainMapperChainReducer用逻辑关系式可以表示为:

[ map* | reducer mapper* ]

 这个逻辑表达式一定要理解数据流的走向,最好能结合代码看下。

 

将上面的内容结合起来,就可以得到本节的核心内容,表示为:

[ map* | reducer mapper* ]+

这个关系式如果能理解,再结合自己工作中的逻辑,就算是深入的理解本节的内容了。

 

最后贴个wordcount的代码,根据本节的内容做了个更改,逻辑性方面就不要再追究了,仅供演示使用。

代码不做解释,大家就是需要理解一下这个过程:

 

public class WordCount {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map ( Object key, Text value, Context context ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
        public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException {
            String k = key.toString();
            context.write(new Text(k.toUpperCase()), value);
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce ( Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static class ThresholdFilterMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
        public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException {
            int cnt = value.get();
            if (cnt > 10)
                context.write(key, value);
        }
    }

    public static void main ( String[] args ) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        JobConf jc = new JobConf("JobControl");
        Job job = new Job(jc);
        Configuration tokenConf = new Configuration(false);
        ChainMapper.addMapper(job, TokenizerMapper.class, Object.class, Text.class, Text.class, IntWritable.class, tokenConf);
        Configuration upperConf = new Configuration(false);
        ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperConf);
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        Configuration reducerConf = new Configuration(false);
        ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, reducerConf);
        Configuration thresholdConf = new Configuration(false);
        ChainReducer.addMapper(job, ThresholdFilterMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, thresholdConf);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

 

分享到:
评论

相关推荐

    MapReduce编程实例:单词计数

    本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含...

    MapReduce设计模式介绍.ppt

    MapReduce 设计模式知识点总结 MapReduce 设计模式是大数据处理的核心组件,负责将大规模数据处理和分析任务分解为可并行处理的任务。MapReduce 设计模式主要由两个阶段组成:Map 阶段和 Reduce 阶段。 Map 阶段 ...

    MapReduce设计模式.pdf

    MapReduce设计模式.pdf

    MapReduce设计模式

    , 由于本书不会过多涉及底层框架及MapReduce API,所以希望读者阅读《MapReduce设计模式》之前,能够对Hadoop系统有所了解,知道如何编写MapReduce程序,并了解MapReduce程序框架的工作原理。《MapReduce设计模式》...

    MapReduce设计模式高清完整.pdf版

    MapReduce设计模式.pdf 个人收集电子书,仅用学习使用,不可用于商业用途,如有版权问题,请联系删除!

    [MapReduce] MapReduce 设计模式 (英文版)

    [奥莱理] MapReduce 设计模式 (英文版) [奥莱理] MapReduce Design Patterns Building Effective Algorithms and Analytics for Hadoop and Other Systems (E-Book) ☆ 出版信息:☆ [作者信息] Donald Miner, ...

    MapReduce实例分析:单词计数

    单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数...其次,确定 MapReduce 程序的设计思路。把文件内容分

    MapReduce设计模式 [(美)迈纳,(美)舒克著][人民邮电出版社][2014.09][213页]

    MapReduce设计模式 ,值得一看

    mapreduce 设计模式

    书中主要介绍编程模式,即如何利用MapReduce框架解决一类问题,重在提供解决问题的方法和思路。作者花大量篇幅介绍各种模式的原理及实现机制,并给出相应的应用实例,让读者对每种模式能有更直观的理解。

    MapReduce: Simplified Data Processing on Large Clusters 英文原文

    这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686

    第一个Mapreduce程序.pdf

    HDFS是一个分布式的文件系统,设计用于存储大量数据,并且能够提供高吞吐量的数据访问。 在上传文件到HDFS时,文中遇到了一个错误提示“put: `words.txt': No such file or directory”。这可能意味着文件名、路径...

    GFS、MapReduce和BigTable:Google的三种大数据处理系统.docx

    【MapReduce:并行计算模型】 MapReduce是Google为大数据处理设计的一种编程模型,主要用于处理和生成大规模数据集。它将复杂任务分解为两个阶段——Map和Reduce,Map阶段将数据集切分为可处理的片段,由多个工作...

    GFS、MapReduce和BigTable:Google的三种大数据处理系统.pdf

    MapReduce的设计使得开发者能够轻松处理PB级别的数据,而无需关注底层的分布式细节。 BigTable是一个大规模分布式数据库,适合存储非结构化和半结构化的数据。它使用列族模型,允许快速读取和写入大量数据。...

Global site tag (gtag.js) - Google Analytics