`

MapReduce设计模式:Local Aggregation

 
阅读更多

MapReduce设计模式:Local Aggregation

MapReduceGoogle提出的一个软件架构,用于大规模数据集的并行计算。其中MapReduce是其主要思想,都是从函数式编程中借过来的,还有矢量编程的影子在里面。

MapReduce编程模式极大的简化了开发人员的并发编程模型处理。开发者只需要关心如何分割、调度、计算、错误处理等,其他传输、管理、冗余、容错等都由MapReduce框架处理,该处理模型能够有效的利用分布式系统的丰富资源。

 

MapReduce框架的讲解不是本文的内容,但是MapReduce管理、处理数据的过程和本文的内容非常相关,下面我们会详细介绍下MapReduce处理数据的流程。

 

MapReduce所显示的那样,MapReduce过程可以算是框架的核心内容了,但是如果你想充分利用MapReduce的计算能力,尽可能减小系统处理数据的瓶颈,下面这些内容的理解也是必不可少的:

 

Record Reader:从HDFS文件系统中读取数据,LineRecordReaderKeyValueLineRecordReaderSequenceFileRecordReaderStreamBaseRecordReader等各种RecordReader,来方便开发者控制输入数据的值。

Map:这个是MapReduce的核心之一,用户根据自己的业务逻辑从输入数据中提取自己感兴趣的信息。

Combiner:这个Combiner实在每个Map之后,在将数据提交之前,对于Map结果的聚合,Combiner使用合适的话,能够极大的降低网络IO

Partition:将Map的结果输出到Reduce处理的机器上。

Shuffle:将不同Map的结果输出到Reduce的机器上

Sort:输出到同一个Reduce机器上的数据进行排序

ReduceMapReduce的另外一个核心内容,实际上实现的是Map内容的聚合内容

Output Format:将Reduce结果输出

 

这个过程需要深入理解,包括各个阶段的功能、位置等,这些都是MapReduce设计模式的重要内容。

 

Ok,确保自己看懂了上面的内容,我们继续下面的内容,Local Aggregation设计模式实际上是针对MapReduce框架进行优化使用的,如果你不使用Local Aggregation的话,自己的MapReduce逻辑也能够处理;只是时间可能没有显得特别高效,或者系统存在这样那样的瓶颈;本来能用5个小时完成的数据处理,运用本节的内容,有可能1个小时就能完成,这个时间的提升还是很客观的;因此本节的内容并不是可有可无的,相反,正确的理解本节的内容反而有助于日常工作的进行。

 

另外提一点的是,本系列中的内容全部使用伪码,不过对于熟悉MapReduce程序的人来说,这些伪码和实际的代码页没有什么区别。

 

在数据密集型处理场景下,假如有上千万乃至上亿的文档,当然这些文档和网络上的文档相比是小巫见大巫了,我们想统计一下word count,这个很简单,我们很容易写出来word count的例程:

class Mapper
  method map(docid a)
    for all term t in doc a do:
      emit(term t,count 1)
      
class Reducer
  method reduce(term t, counts[c1,c2,c3...])
    sum = 0
    for all count c in counts[c1,c2,c3...] do
      sum = sum + c
    emit(term t,count sum)

把这个MapReduce程序放到集群上实验一下,数据不错;将doc增大十倍,还能出结果;增大一百倍,好吧,尽管时间长,结果还是有的;直接增大一千倍,咦,怎么还没有结果?第二天过来,还是没有结果?(该场景由笔者自馔,不代表真实场景)

 

分析一下:

1:有可能你检查下Job的状态,发现没有错误,Job还在正常运行;

2Job 失败,失败原因有可能是网络IO超时、网络IO过重、内存不足等

 

反正这个MapReduce程序能运行,但不能正常的运行,如果想健壮运行这个wordcount,还是有很多工作需要做,其中一个就是Local Aggregation的内容。

我面我们来简单提升一下wordcount的算法,由于优化是在mapper内,所以可以成为in-mapper local aggregation算法:

class Mapper
  method map(docid a)
    H = new Array
    for all term t in doc a do:
      H(t) = H(t) + 1
    for all term t in H do:
      emit(term t, count H(t))
    
    
class Reducer
  method reduce(term t, counts[c1,c2,c3...])
    sum = 0
    for all count c in counts[c1,c2,c3...] do
      sum = sum + c
    emit(term t,count sum)

 

提升的过程比较简单,就是将一个Doc内的term先聚合,再将结果输出。这么做的好处是能够降低单个Doc内的输出的字段数。

 

如果只是聚合单个doc内的term,我为什么不再进一步呢?我将单个Map机器上的docterm聚合后再输出呢?看下这个思路的内容:

class Mapper
  method setup
    H = new Array
  method map(docid a)
    for all term t in doc a do:
      H(t) = H(t) + 1
  method cleanup
    for all term t in H do:
      emit(term t, count H(t))
    
class Reducer
  method reduce(term t, counts[c1,c2,c3...])
    sum = 0
    for all count c in counts[c1,c2,c3...] do
      sum = sum + c
    emit(term t,count sum)

 

这个Mapper的内容就有点复杂了,每个Mapper会在启动时先执行setup方法,初始化Mapper,然后执行map函数,等到Mapper执行完成后,会再执行cleanup方法;Reduce阶段不变。

 

这个MapReduce程序比最开始的程序要快上很多,但是都有个很致命的瓶颈:如果docterm的数据量巨大,超出Mapper机器的内存限制,Mapper同样会失败。

 

这个瓶颈的存在让人很不舒服,如果能根据Mapper机器的内存,既能最大程度的使用内存,又能尽可能的使用in-mapper local aggregation的优势,最好不过了,因此我们可以使用block-mapper local aggregation

class Mapper
  method setup
    H = new Fixed Array
  method map(docid a)
    for all term t in doc a do:
      H(t) = H(t) + 1
    if H is full:
      for all term t in H do:
        emit(term t, count H(t))
  method cleanup
    for all term t in H do:
      emit(term t, count H(t))
    
class Reducer
  method reduce(term t, counts[c1,c2,c3...])
    sum = 0
    for all count c in counts[c1,c2,c3...] do
      sum = sum + c
    emit(term t,count sum)

 

这个优化就很明显了,既能充分使用内存大小,又能避免单个Mapper机器内存溢出的瓶颈,算是比较明显的成果了。

 

word count的实例已经能够显示出这个优化过程了,我们再以求平均数为例来看下这个优化过程;不过过程比较的繁琐,我们直接给出求平均数的最终MapReduce程序。

 

 

class Mapper
  method setup
    S = new Array
    C = new Array
  method map(String t,integer r)
    S{t} = S{t} + 1
    C{t} = C{t} + 1
  method cleanup
    for all term t in S do:
      emit(term t, pair(S{t},C{t}))

class Combiner
  method combine(String t, pair([(s1,c1),(s2,c2)...]))
    sum = 0
    cnt = 0
    for all pair(s,c) in pair([(s1,c1),(s2,c2)...]) do:
      sum = sum + s
      cnt = cnt + c
    emit(String t, pair(sum,cnt))
    
class Reducer
  method reduce(String t, pair([(s1,c1),(s2,c2)...]))
    sum = 0
    cnt = 0
    for all pair(s,c) in pair([(s1,c1),(s2,c2)...]) do:
      sum = sum + s
      cnt = cnt + c
    avg = sum / cnt
    emit(String t,integer avg)

 

 

可以看到,为了极致的使用MapReduce集群的计算能力,,既有in-mapper的优化,又有Combiner内容的优化,如果想进一步优化,可以将block in-mapper也考虑进来,代码也越来越复杂;

 

计算速度在不断优化提升,在计算能力提升的同时,代码的可扩展性也越来越弱;这个就是个鱼与熊掌的抉择;不过针对特定的业务场景,我们总是倾向于选择前者。

 

 

 

 

分享到:
评论

相关推荐

    MapReduce编程实例:单词计数

    本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含...

    MapReduce 设计模式

    MapReduce 设计模式,深入理解MapReduce编程模式,更好的利用MapReduce模型

    MapReduce设计模式介绍.ppt

    MapReduce 设计模式知识点总结 MapReduce 设计模式是大数据处理的核心组件,负责将大规模数据处理和分析任务分解为可并行处理的任务。MapReduce 设计模式主要由两个阶段组成:Map 阶段和 Reduce 阶段。 Map 阶段 ...

    MapReduce设计模式.pdf

    MapReduce设计模式.pdf

    MapReduce设计模式

    , 由于本书不会过多涉及底层框架及MapReduce API,所以希望读者阅读《MapReduce设计模式》之前,能够对Hadoop系统有所了解,知道如何编写MapReduce程序,并了解MapReduce程序框架的工作原理。《MapReduce设计模式》...

    MapReduce设计模式高清完整.pdf版

    MapReduce设计模式.pdf 个人收集电子书,仅用学习使用,不可用于商业用途,如有版权问题,请联系删除!

    MapReduce设计模式 英文版

    <MapReduce设计模式>英文版,概述性的介绍了MapReduce的常见设计模式和应用场景.详细的源码可以帮助理解.

    [MapReduce] MapReduce 设计模式 (英文版)

    [奥莱理] MapReduce 设计模式 (英文版) [奥莱理] MapReduce Design Patterns Building Effective Algorithms and Analytics for Hadoop and Other Systems (E-Book) ☆ 出版信息:☆ [作者信息] Donald Miner, ...

    MapReduce实例分析:单词计数

    单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数...其次,确定 MapReduce 程序的设计思路。把文件内容分

    MapReduce设计模式 [(美)迈纳,(美)舒克著][人民邮电出版社][2014.09][213页]

    MapReduce设计模式 ,值得一看

    Hadoop MapReduce 设计模式

    This book will be unique in some ways and familiar in others. First and foremost, this book is obviously about design patterns, which are templates or general guides to solving problems....

    mapreduce 设计模式

    书中主要介绍编程模式,即如何利用MapReduce框架解决一类问题,重在提供解决问题的方法和思路。作者花大量篇幅介绍各种模式的原理及实现机制,并给出相应的应用实例,让读者对每种模式能有更直观的理解。

    hadoop mapreduce 设计模式

    hadoop mapreduce 设计模式,mapreduce 程序编写,英文数据,但是代码结构清晰,容易看懂,适合实战

    MapReduce: Simplified Data Processing on Large Clusters 英文原文

    这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686

    GFS、MapReduce和BigTable:Google的三种大数据处理系统.docx

    【MapReduce:并行计算模型】 MapReduce是Google为大数据处理设计的一种编程模型,主要用于处理和生成大规模数据集。它将复杂任务分解为两个阶段——Map和Reduce,Map阶段将数据集切分为可处理的片段,由多个工作...

    GFS、MapReduce和BigTable:Google的三种大数据处理系统.pdf

    MapReduce的设计使得开发者能够轻松处理PB级别的数据,而无需关注底层的分布式细节。 BigTable是一个大规模分布式数据库,适合存储非结构化和半结构化的数据。它使用列族模型,允许快速读取和写入大量数据。...

    MapReduce: Simplified Data Processing on Large Clusters中文版

    MapReduce 模型的编程模式包括两个主要函数:Map 和 Reduce。Map 函数处理输入的键值对,并产生一组中间的键值对。Reduce 函数处理中间键值对,并合并这些值,形成一个相对较小的值集合。该模型的实现可以让程序员不...

    深入理解MapReduce架构设计与实现原理 高清 完整书签

    《深入理解MapReduce架构设计与实现原理》是一本专注于大数据处理技术MapReduce的专业书籍,由阿里专家撰写。MapReduce是Google提出的一种分布式计算模型,它为海量数据的处理提供了强大的计算能力,尤其在大规模...

    第一个Mapreduce程序.pdf

    本文介绍了用Java编写并运行第一个mapreduce作业的步骤及遇到的问题和解决方案。

Global site tag (gtag.js) - Google Analytics