`
zy19982004
  • 浏览: 661767 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:251946
社区版块
存档分类
最新评论

Hadoop学习三十四:Hadoop-MapReduce Job本地运行流程

 
阅读更多

一. 版本环境

     以前工作的过程中,陆陆续续看过一些Hadoop1.0 MapReduce的源码,但没有形成体系。现在再次来看,顺便记录。此次学习版本的是Hadoop2.2.0 MapReduce。环境为直接在Win7下Local模式调试。MapReduce。

 

二. Job提交流程

     从Job waitForCompletion开始

1 Job submit

1.1 JobSubmitter submitJobInternal

1.1.1 JobSubmissionFiles.getStagingDir 初始化Job系统工作目录jobStagingArea。如D:\tmp\hadoop-root\mapred\staging\rootXXXXXXXXXX\.staging。

1.1.2 获得JobID。如job_localXXXXXXXXXX_0001。

1.1.3 copyAndConfigureFiles copy Job Jar到submitJobDir = new Path(jobStagingArea, jobId.toString())。

1.1.4 writeSplits,将input划分为split,并将split数据和split元数据写入系统工作目录,最后返回split的数目。input如下

1.1.4.1 调用TextInputFormat getSplits方法获得split,集群环境BlockSize为128M,所以145M的test-data.txt被划分为两个split。相关算法自己去看,提供两个数据BlockLocation[0,134217728,201slave,203slave,202slave, 134217728,18093772,201slave,203slave,202slave],InputSplit[hdfs://192.168.1.200:9000/user/root/input/test-data.txt:0+134217728, hdfs://192.168.1.200:9000/user/root/input/test-data.txt:134217728+18093772]

1.1.4.2 JobSplitWriter.createSplitFiles将split数据和split元数据写入系统工作目录。

1.1.5 writeConf,将配置文件写到系统工作目录。此时系统工作目录如下

1.1.6 LocalJobRunner submitJob

1.1.6.1 new Job

1.1.6.1.1 Job初始化

1.6.1.1.1.1 systemJobDir就是上面的submitJobDir,systemJobFile = submitJobDir\job.xml

1.1.6.1.1.2 将配置文件写入本地工作目录localJobDir\localJobFile。如D:\tmp\hadoop-root\mapred\local\localRunner\root\job_localXXXXXXXXXX_0001\job_localXXXXXXXXXX_0001.xml。此时,本地工作目录如下

1.1.6.1.2 Job run

 

三. Job run流程

 

     Job run方法很大,是整个Job执行的核心框架,自定义的Mapper和Reduce都会在这里被调起。我把这个方法单独拿出来说。

1 创建OutputCommitter

2 从系统工作目录split数据和元数据文件里获得split信息TaskSplitMetaInfo[]

3 根据TaskSplitMetaInfo[]创建List<MapTaskRunnable>,显然会有两个MapTaskRunnable

4 ExecutorService运行每个MapTaskRunnable

4.1创建MapTask并执行run

4.2 runNewMapper

4.2.1 反射创建自定义的Mapper mapper

4.2.2 反射创建InputFormat

4.2.3 从系统工作目录文件里获得此MapTask的split

4.2.4 反射创建RecordReader

4.2.5 反射创建RecordWriter output

4.2.6 创建MapContextImpl

4.2.7 mapper.run(mapperContext),可能涉及到数据的spill。

4.2.8 output.close(mapperContext),涉及到数据的sort spill combin merge。

5 等待每个MapTaskRunnable运行完。但两个MapTaskRunnable都运行完,如下图

6 将Mapper的结果mv & rename到Reduce的本地工作目录,此时

7 创建ReduceTask并执行run

7.1 merge & sort

7.2 runNewReducer

7.2.1 反射创建Reducer

7.2.2 反射创建RecordWriter,准备好临时目录流。参考类FileOutputFormat

/**
   * Get the default path and filename for the output format.
   * @param context the task context
   * @param extension an extension to add to the filename
   * @return a full path $output/_temporary/$taskid/part-[mr]-$id
   * @throws IOException
   */
  public Path getDefaultWorkFile(TaskAttemptContext context,
                                 String extension) throws IOException{
    FileOutputCommitter committer = 
      (FileOutputCommitter) getOutputCommitter(context);
    return new Path(committer.getWorkPath(), getUniqueFile(context, 
      getOutputName(context), extension));
  }

 

7.2.3 调用自己的Reduce,将结果输出到临时目录下

   

8 OutputCommitter将Reduce的结果mv到output下

9 清理以下目录

9.1 系统工作目录systemJobFile.getParent()

9.2 本地工作目录localJobFile 

 

四. 大流程

     最后用一张图总结本文

 

       再补充一下:本地MapReduce执行时  ,有几个线程来运行MapTask
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);

maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.

ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
   实际上就是Math.max(Math.min(maxMapThreads, this.numMapTasks), 1)

 

     不断学习不断补充

     当map和reduce同时存在时,map的结果先flush到硬盘上,reduce时以此为输入,计算完后由FileOutputFormat直接写到临时目录里,最后OutputCommitter将此结果mv到output下;

     如果只有map没有reduce阶段即 job.setNumReduceTasks(0)时又如何。这个时候map的计算结果将通过FileOutputFormat直接写到临时目录里,最后OutputCommitter将此结果mv到output下。

MapTask.runNewMapper

// get an output object
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

NewDirectOutputCollector and NewOutputCollector all extends from RecordWriter, different RecordWriter decision to different wirtelocation

 

 

 

1
2
分享到:
评论

相关推荐

    Hadoop学习总结之四:Map-Reduce过程解析

    ### Hadoop MapReduce任务提交与执行流程解析 #### 一、客户端提交任务 在Hadoop MapReduce框架中,客户端的任务提交是整个MapReduce作业启动的关键步骤。这一过程主要由`JobClient`类中的`runJob(JobConf job)`...

    03-Hadoop-MapReduce.docx

    MapReduce工作流程包括JobTracker(在Hadoop 2.x中被ResourceManager替代)、TaskTracker(被NodeManager替代)以及MapTask和ReduceTask。JobTracker负责任务调度,TaskTracker执行实际的任务。 **1.5 WordCount...

    Hadoop应用系列2--MapReduce原理浅析(上)

    Hadoop提供了丰富的工具来支持MapReduce作业的管理和调试,包括`hadoop jar`命令用于提交作业,`hadoop fs`用于文件系统操作,以及`job`和`task`命令用于查看作业和任务的状态。 总结,MapReduce是Hadoop处理大数据...

    大数据-Hadoop-MapReduce介绍

    - **本地模式**:在这种模式下,Job 在单个 JVM 中运行,适用于调试和测试。 - **伪分布式模式**:在这种模式下,Job 在单个机器上模拟多个节点运行,适合于测试和小型集群。 - **完全分布式模式**:这是生产环境中...

    理论部分-MapReduce-hadoop1

    Hadoop是Apache基金会开源的一个实现MapReduce的框架,它使得在大规模集群上运行MapReduce任务变得简单易行。以下将详细介绍MapReduce作业的执行流程及其核心组件。 1. **MapReduce作业执行流程** - **代码编写**...

    阿里云 专有云企业版 V3.9.0 E-MapReduce 用户指南 20200330.pdf

    - **作业提交**:支持Hadoop MapReduce、Spark Job等作业提交方式。 - **监控与管理**:提供丰富的监控指标,帮助用户监控集群健康状态和作业执行情况。 - **优化调优**:根据业务需求和性能指标,进行参数调整和...

    使用命令行编译打包运行自己的MapReduce程序 Hadoop2.6.0

    ### 使用命令行编译打包运行自己...以上就是使用命令行编译打包运行自己的MapReduce程序的过程详解,包括了Hadoop 2.6.0版本的变化、编译打包流程、运行命令解析以及使用Eclipse进行开发的方法。希望对初学者有所帮助。

    大数据实验5实验报告:MapReduce 初级编程实践

    运行这个MapReduce作业时,我们需要配置Hadoop环境,指定输入文件(A和B)的位置以及输出文件(C)的路径。通过Hadoop的`Job`类和相关输入输出格式类,可以设置这些参数并提交作业到Hadoop集群执行。 总结起来,这...

    Hadoop源码解析---MapReduce之InputFormat

    在Hadoop的生态系统中,MapReduce是处理海量数据的一种编程模型,而InputFormat作为MapReduce编程模型的重要组成部分,是负责处理输入数据的关键接口。为了深入理解MapReduce工作原理,必须掌握InputFormat的设计和...

    hadoop命令集

    - `hadoop job -kill &lt;job-id&gt;`:杀死指定的作业。例如: - `hadoop job -kill job_201005310937_0053`:杀死ID为job_201005310937_0053的作业。 4. **其他命令** - `hadoop namenode -format`:格式化NameNode...

    Hadoop-eclipse-plugin-2.7.6下载与说明

    5. **本地运行与调试**:Hadoop-eclipse-plugin提供了在本地运行MapReduce作业的功能。在Java应用程序的主类上右键单击,选择“Run As” -&gt; “Map/Reduce Job”,然后选择本地运行或连接到远程集群。 6. **连接到...

    seminario-mapreduce:用于 Ciemat-UEX Hadoop 会议的资源

    【标题】"seminario-mapreduce:用于 Ciemat-UEX Hadoop 会议的资源" ...总的来说,这个压缩包提供的资源将为学习者提供一个全面了解和深入实践Hadoop MapReduce和YARN的平台,帮助他们在大数据处理领域提升技能。

    java下hadoop开发使用jar包

    `hadoop-mapreduce-client-core`则包含MapReduce的基本操作,比如Job提交、任务调度等。 在Java项目中,这些jar包通常通过Maven或Gradle等构建工具管理。在Maven的`pom.xml`文件中,你可以添加如下依赖来引入Hadoop...

    eclipse运行mr插件hadoop-eclipse-plugin-2.6.0.jar

    5. **编写MapReduce程序**: 使用Eclipse编写Java代码实现MapReduce任务,然后右键点击项目,选择`Run As` -&gt; `Hadoop Job`来提交作业到远程Hadoop集群。 6. **监控作业状态**: 插件还会在Eclipse的`Progress`视图中...

    Hadoop命令使用手册中文版

    ### Hadoop命令使用手册中文版知识点详解 #### 一、Hadoop概述 Hadoop是一款开源软件框架,主要用于处理大规模数据集(通常在集群环境中)。它能够高效地存储和处理非常大的数据集,使得用户能够在相对较低成本的...

    Hadoop集群部署及测试实验(三).docx

    - **学习JAVA API调用Hadoop接口:** 掌握如何利用JAVA编程语言编写MapReduce程序,并通过Hadoop API进行数据处理。 - **实现多文本文件的倒排索引功能:** 使用MapReduce技术处理多文本文件,建立一个高效的倒排...

    Hadoop MapReduce实现tfidf源码

    在大数据处理领域,Hadoop MapReduce是一种广泛应用的分布式计算框架,它使得在大规模数据集上进行并行计算成为可能。本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document ...

    Hadoop-mapreduce过程.doc

    本文将从客户端、JobTracker、TaskTracker和Child四个角度,详细阐述MapReduce的工作流程。 1. **客户端提交任务** MapReduce的执行始于客户端提交一个任务。这一过程主要由`JobClient.runJob(JobConf)`静态函数...

Global site tag (gtag.js) - Google Analytics