`
ssydxa219
  • 浏览: 626273 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

MapReduce

 
阅读更多

MapReduce

        MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,MapReduce程序

        本质上是并行运行的,因此可以解决海量数据的计算问题.

 

        MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段.每个阶段都以键

         值对作为输入和输出.用户只需要实现map()和reduce()两个函数即可实现分布式计算.

 

    执行步骤:

        map任务处理:

            1.读取输入文件内容,解析成键值对(key/value).对输入文件的每一行,解析成

                键值对(key/value).每一个键值对调用一次map函数

            2.写自己的逻辑,对输入的键值对(key/value)处理,转换成新的键值对

                (key/value)输出.

            3.对输出的键值对(key/value)进行分区.(partition)

            4.对不同分区的数据,按照key进行排序,分组.相同的key/value放到

                一个集合中.(shuffle)

            5.分组后的数据进行规约.(combiner,可选择的)

    reduce任务处理:

        1.对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点.

        2.对多个map任务的输出进行合并,排序.写reduce函数自己的逻辑,对输入的

            key/value处理,转换成新的key/value输出.

        3.把reduce的输出保存到文件中(写入到hdfs中).

 

    MapReduce作业流程:

        1.代码编写

        2.作业配置(输入输出路径,reduce数量等等)

        3.作业提交

            3.1通过JobClient提交,与JobTracker通信得到一个jar的存储路径和JobId.

            3.2检查输入输出的路径

            3.3计算分片信息.

            3.4将作于所需要的资源(jar,配置文件,计算所得的输入分片)赋值到以作业ID

                  命名的HDFS上

            3.5告知JobTracker作业准备执行.

        4.作业初始化

            当JobTracker接收到提交过来的作业后,会把次调用放入一个内部队列中,

             交由作业调度器进行调度,默认是(FIFO),并对其初始化.

            初始化:创建一个表示正在运行作业的对象--分装任务和记录信息,以便

             跟踪任务的状态和进程.

            为了创建任务列表,作业调度器首先从共享文件系统中获取已经计算好的

             输入分片信息.然后为每一个分片创建一个map任务,调度器创建相应数量

             的要运行的reduce任务.此时,任务被指定ID.

        5.任务分配

            tasktracker运行一个简单的循环来定期发送"心跳"给JobTracker,

             心跳告知JobTracker,tasktracker是否还存活,同时指明tasktracker是否

             已经准备好运行新的任务,如果是,JobTracker会分配给它一个任务.

        6.任务执行

            tasktracker拿到任务后

            1.会将所有的信息拷贝到本地(包括jar,代码,配置信息,分片信息等)

            2.tasktracker为任务新建一个本地工作目录,并把jar文件中的内容解压到

                这个目录下.

            3.tasktracker新建一个TaskRunner实例来运行该任务.

              TaskRunner会启动一个新的JVM来运行每个步骤.(防止其他软件影响

                到tasktracker,但是在不同的任务之间重用JVM是有可能的.

        7.进度和状态的更新

            task会定期向tasktracker汇报执行情况,tasktracker会定期收集所在

             集群上的所有task信息,并想JobTracker汇报.JobTracker会根据所有

             tasktracker汇报上来的信息进行汇总

        8.作业完成

             JobTracker是在接收到最后一个任务完成后,才将任务标记为"成功".

            并将数据结果写入到HDFS上.

        PS:

            JobTracker职能:负责接收用户提交的作业,负责启动,跟踪任务执行

            tasktracker职能:负责执行任务

    作业失败:

        1.JobTracker失败

            这是最为严重的一种任务失败,失败机制--它是一个单节点故障,因此,

             作业注定失败.(hadoop2.0解决了)

        2.tasktracker失败

            tasktracker崩溃了会停止向jobt发送心跳信息,并且JobTracker会将

             tasktracker从等待的任务池中移除,将该任务转移到其他的地方执行.

             JobTracker会将tasktracker加入到黑名单.

        3.task失败

             map或reduce运行失败,会向tasktracker抛出异常,任务挂起.

 

MapReduce启动流程:

             start-mapred.sh  --> hadoop-daemon.sh --> hadoop 

            -->org.apache.hadoop.mapred.JobTracker

    Jobtracker调用顺序:

            main --> startTracker  --> new JobTracker 在其构造方法中首先创建

             一个调度器,接着创建一个RPC的server(interTrackerServer)

             tasktracker会通过PRC接触与其通信,然后调用offerService方法对外

             提供服务,在offerService方法中启动RPC server,初始化jobtracker,

             调用taskScheduler的start方法 --> eagerTaskInitializationListener

             调用start方法,接着调用jobInitManagerThread的start方法,

             因为其是一个线程,会调用JobInitManager的run方法,随后

             jobInitQueue任务队列去取第一个任务,然后把它丢入线程池中,

            再调用-->InitJob的run方法,再然后调用jobTracker的initJob方法

             --> JobInProgress的initTasks 

             --> maps = new TaskInProgress[numMapTasks]和

             reduces = new TaskInProgress[numReduceTasks];

     TaskTracker调用顺序:

            main --> new TaskTracker在其构造方法中调用了initialize方法,

             在initialize方法中调用RPC.waitForProxy,得到一个jobtracker的

             代理对象,接着TaskTracker调用了本身的run方法,

             --> offerService方法  --> transmitHeartBeat返回值是

              (HeartbeatResponse)是jobTracker的指令,在transmitHeartBeat

             方法中InterTrackerProtocol调用了heartbeat将tasktracker的状态

             通过RPC机制发送给jobTracker,返回值就是JobTracker的指

             令,heartbeatResponse.getActions()得到具体的指令,然后判断指令

             的具体类型,开始执行任务,addToTaskQueue动类型的指令

             加入到队列当中,TaskLauncher又把任务加入到任务队列当中,

               -->  TaskLauncher的run方法 --> startNewTask方法 

             --> localizeJob下载资源 --> launchTaskForJob开始加载任务

             --> launchTask  --> runner.start()启动线程; --> 

            TaskRunner调用run方法 --> launchJvmAndWait启动java child进程

 

MapReduce的细节

    序列化概念

        序列化:是指把结构化对象转化为字节流.

        反序列化:是序列化的逆过程.即:把字节流转回结构化对象.

    hadoop序列化格式特点:

        1.紧凑:高效使用存储空间

        2.快速:读写数据的额外开销小

        3.可扩展:可透明的读取老格式的数据

        4.互操作:支持多语言交互.

    hadoop序列化的作用:

        序列化在分布式环境的两大作用:进程间的通信,永久存储.

        hadoop节点间通信.

 

Partitioner编程

    将有一些共同特性的数据,写入到同一个文件里.

 

排序和分组

    在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。

    如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,

     才能参与比较。如果想自定义排序规则,被排序的对象要实现

     WritableComparable接口,在compareTo方法中实现排序规则,

     然后将这个对象当做k2,即可完成排序分组时也是按照k2进行比较的。

 

Combiners编程

    1.每一个map会产生大量的输出,combiner的作用就是在map端对输出

       先做一次合并,以减少传输到reducer的数据量.

    2.combiner最基本是实现本地key的归并,具有类似本地reduce功能

      如果不用combiner,那么所有的结果都是reduce完成,效率会相对降低,

    3.使用combiner,先完成的map会在本地聚合,提升速度.

    PS:Combiner的输出是Reducer的输入,Combiner绝对不能改变最终的计算结果.

       所以从个人角度来说,Combiner只应用于那种Reducer的输入key/value与

        输出key/value类型完全一致,且不影响最终结果的场景.比如:累加,最值等.

   

shuffle

        MapReduce确保每个Reducer的输入都按键排序,系统执行排序的过程

         ---将map输出作为输入传给Reducer---成为shuffle(洗牌)

        1.map函数开始产生输出时,并不是简单的将它写到磁盘.它利用缓冲的

            方式写到内存,并出于效率的考虑进行了预排序.

          每个map任务都有一个环形内存缓冲区,用于存储任务的输出.默认情况下,

            缓冲区大小为100MB,一旦缓冲内容达到阀值(默认为80%),一个后台线程

            便开始把内容写到磁盘指定目录下新建一个溢写文件中.在写到磁盘过程中,

            map输出继续被写到缓冲区,但如果在此期间缓冲区被填满, map会阻塞,

            直到写磁盘过程完成.

        2.写磁盘前,要partition,sort。如果有combiner,combine排序后数据。

        3.等最后记录写完,合并全部溢写文件为一个分区且排序的文件。

        

    Reducer如何知道要从哪个tasktracker取得map输出呢?

        map任务成功完成后,它们会通知其父tasktracker状态已更新,然后tasktracker

        进而通知JobTracker.这些通知在心跳机制中传输.因此,对于指定作业,

        JobTracker知道map输出和tasktracker之间的映射关系.Reducer中的一个

        线程定期询问JobTracker以便获取map输出的位置,直到它获得所有输出位置.

分享到:
评论

相关推荐

    基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    学生mapreduce成绩分析

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个模型将复杂的计算任务分解成两个主要阶段:Map(映射)和Reduce(化简),使得在大规模分布式环境下处理大数据变得可能...

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...

    【MapReduce篇07】MapReduce之数据清洗ETL1

    MapReduce之数据清洗ETL详解 MapReduce是一种基于Hadoop的分布式计算框架,广泛应用于大数据处理领域。数据清洗(Data Cleaning)是数据处理过程中非常重要的一步,旨在清洁和转换原始数据,使其更加可靠和有用。...

    MapReduce实例分析:单词计数

    单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数。本节通过单词计数实例来阐述采用 MapReduce 解决...

    大数据实验四-MapReduce编程实践

    ### 大数据实验四-MapReduce编程实践 #### 一、实验内容与目的 ##### 实验内容概述 本次实验的主要内容是使用MapReduce框架来实现WordCount词频统计功能,即统计HDFS(Hadoop Distributed File System)系统中多个...

    MapReduce 谷歌实验室论文

    MapReduce是一种由谷歌实验室提出的大规模数据处理模型及其相关实现方案。这篇论文详细介绍了MapReduce的概念、工作机制以及在实际中的应用。MapReduce模型通过两个主要函数——Map函数和Reduce函数来处理数据,使得...

    基于MapReduce的Apriori算法代码

    基于MapReduce的Apriori算法代码 基于MapReduce的Apriori算法代码是一个使用Hadoop MapReduce框架实现的关联规则挖掘算法,称为Apriori算法。Apriori算法是一种经典的关联规则挖掘算法,用于发现事务数据库中频繁...

    大数据 hadoop mapreduce 词频统计

    【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...

    mapreduce项目 数据清洗

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。它将复杂的并行计算任务分解成两个主要阶段:Map(映射)和Reduce(化简)。在这个"MapReduce项目 数据清洗"中,我们将探讨...

    Hadoop原理与技术MapReduce实验

    (2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...

    Hadoop之MapReduce编程实例完整源码

    一个自己写的Hadoop MapReduce实例源码,网上看到不少网友在学习MapReduce编程,但是除了wordcount范例外实例比较少,故上传自己的一个。包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作...

    基于MapReduce的交互可视化平台

    基于MapReduce的交互可视化平台是一个涉及大数据处理和图形界面展示的IT解决方案,旨在通过可视化的方式增强用户对大数据分析结果的理解。文章详细探讨了如何使用MapReduce这一大数据处理模型,结合GPU加速技术,MPI...

    MapReduce求行平均值--MapReduce案例

    在大数据处理领域,MapReduce是一种广泛使用的分布式计算框架,由Google提出并被Hadoop采纳为标准组件。本案例主要探讨如何使用MapReduce来求取数据集的行平均值,这在数据分析、数据挖掘以及日志分析等场景中非常...

    MapReduce基础.pdf

    ### MapReduce基础知识详解 #### 一、MapReduce概述 **MapReduce** 是一种编程模型,最初由Google提出并在Hadoop中实现,用于处理大规模数据集的分布式计算问题。该模型的核心思想是将复杂的大型计算任务分解成较...

    基于MapReduce的电信数据清洗系统设计与实现

    内容概要:本文详细介绍了如何使用MapReduce框架设计和实现一个电信数据清洗系统,涵盖数据预处理、无效数据过滤、重复数据检测与删除以及数据格式转换等关键技术步骤。通过具体的代码示例,解释了各阶段的实现细节...

    论文:MapReduce: Simplified Data Processing on Large Clusters

    ### MapReduce: 简化的大型集群数据处理 #### 一、引言 《MapReduce: Simplified Data Processing on Large Clusters》这篇论文由Google的研究员Jeffrey Dean和Sanjay Ghemawat撰写,旨在介绍一种名为MapReduce的...

    大数据实验5实验报告:MapReduce 初级编程实践

    【MapReduce初级编程实践】是大数据处理中的一项基础任务,主要应用于大规模数据集的并行计算。在这个实验中,我们关注的是如何利用MapReduce来实现文件的合并与去重操作。MapReduce是一种分布式计算模型,由Google...

    hadoop-mapreduce-client-jobclient-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...

Global site tag (gtag.js) - Google Analytics