`

YARN/MRv2 MRAppMaster深入剖析—作业生命周期

 
阅读更多

本节分析一个作业从开始运行到运行结束,所经历的整个过程,期间涉及到的各种事件和状态变化。

在正式讲解作业生命周期之前,先要了解MRAppMaster中作业表示方式,每个作业由若干干Map Task和Reduce Task组成,每个Task进一步由若干个TaskAttempt组成,Job、Task和TaskAttempt的生命周期均由一个状态机表示,具体可参考https://issues.apache.org/jira/browse/MAPREDUCE-279(附件中的图yarn-state-machine.job.png,yarn-state-machine.task.png和yarn-state-machine.task-attempt.png)

作业的创建入口在MRAppMaster类中,如下所示:

public class MRAppMaster extends CompositeService {

  public void start() {

    ...

    job = createJob(getConfig());//创建Job

    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);

    jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask

    startJobs();//启动作业,这是后续一切动作的触发之源

    ...

  }

protected Job createJob(Configuration conf) {

  Job newJob =

    new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),

      taskAttemptListener, jobTokenSecretManager, fsTokens, clock,

      completedTasksFromPreviousRun, metrics, committer, newApiCommitter,

      currentUser.getUserName(), appSubmitTime, amInfos, context);

  ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);

  dispatcher.register(JobFinishEvent.Type.class,

    createJobFinishEventHandler());

  return newJob;

  }

}

(1)作业/任务初始化

JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和

ReduceTask,代码如下:

public static class InitTransition

  implements MultipleArcTransition<JobImpl, JobEvent, JobState> {

  ...

  createMapTasks(job, inputLength, taskSplitMetaInfo);

  createReduceTasks(job);

  ...

}

其中,createMapTasks函数实现如下:

private void createMapTasks(JobImpl job, long inputLength,

  TaskSplitMetaInfo[] splits) {

  for (int i=0; i < job.numMapTasks; ++i) {

    TaskImpl task =

      new MapTaskImpl(job.jobId, i,

      job.eventHandler,

      job.remoteJobConfFile,

      job.conf, splits[i],

      job.taskAttemptListener,

job.committer, job.jobToken, job.fsTokens,

job.clock, job.completedTasksFromPreviousRun,

job.applicationAttemptId.getAttemptId(),

job.metrics, job.appContext);

job.addTask(task);

}

}

(2)作业启动

public class MRAppMaster extends CompositeService {

protected void startJobs() {

JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);

dispatcher.getEventHandler().handle(startJobEvent);

}

}

JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调度:

public static class StartTransition

implements SingleArcTransition<JobImpl, JobEvent> {

public void transition(JobImpl job, JobEvent event) {

job.scheduleTasks(job.mapTasks);

job.scheduleTasks(job.reduceTasks);

}

}

这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的任务所经历的状态变化如下(不包含失败或者被杀死情况):

【总结】

本文分析只是起到抛砖引入的作用,读者如果感兴趣,可以自行更深入的研究以下内容:

(1)Job、Task和TaskAttempt状态机设计(分别在JobImpl、TaskImpl和TaskAttemptImpl中)

(2)在以下几种场景下,以上三个状态机的涉及到的变化:

1) kill job

2) kill task attempt

3) fail task attempt

4) container failed

5) lose node

分享到:
评论

相关推荐

    YARN(MRv2)搭建

    YARN(MRv2)搭建

    yarn-v0.23.2.tar.gz

    yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz

    Flink on Yarn_K8S原理剖析及实践.pdf

    Apache Flink是一个开源的分布式流处理框架,用于在...通过这些资源,用户可以深入了解Flink在分布式环境中的运行机制,掌握在YARN/K8S平台上部署Flink集群的最佳实践,并对Flink的核心概念和技术细节有更全面的认识。

    【自动化脚本】自动启动hdfs/yarn/spark HA集群

    脚本使用:vim编辑脚本,按照自己的配置修改主机号,我的是hadoop1、2是NN;hadoop2、3是Spark Master;hadoop3还是RM;hadoop4、5、6是DN、NM、Spark Worker。编辑完成后在满足“前提”的任意一台主机运行均可。 ...

    yarn-v1.22.5.tar.gz

    Yarn 的使用方法基本与 npm 类似,比如 `yarn init` 创建新项目,`yarn add` 添加依赖,`yarn remove` 移除依赖,`yarn upgrade` 升级依赖,`yarn install` 安装项目依赖等。然而,由于 Yarn 的特性,这些操作的执行...

    YARN应用开发与核心源码剖析.pdf

    - **Application Master (AM)**:负责整个应用的生命周期管理,包括向YARN申请资源、启动容器等。 - **Application Worker**:执行实际任务的组件,不是所有应用都需要实现这一部分。 #### 5. YARN开发中的主要协议...

    YARN框架代码详细分析

    MRAppMaster负责管理MapReduce作业的生命周期,包括任务的调度、执行和监控等。它与ResourceManager交互,请求和释放执行任务所需的容器资源。 YARN框架还包括JobHistoryServer,它用于记录作业运行历史,提供作业...

    Yarn框架代码详细分析

    2. 节点管理器(NodeManager, NM):每个节点上的守护进程,负责该节点上的资源使用情况,以及监控容器(Container)的生命周期。 3. 应用程序主节点(ApplicationMaster, AM):负责管理运行在YARN上的每一个应用...

    YARN Essentials.PDF

    - **MapReduce v2 (MRv2):** 作为 YARN 的一部分,支持新的 API,提高了灵活性和性能。 **2. MapReduce v1 与 MapReduce v2 的对比:** - **API 变更:** MRv2 提供了新的 API,允许开发者编写更复杂的应用程序。 ...

    使用yarn create umi安装Ant Design Pro时报错TypeError: self.env.emit is not a function

    TypeError: self.env.emit is not a function at /usr/local/share/.config/yarn/global/node_modules/yeoman-generator/lib/index.js:653:22 at processTicksAndRejections (internal/process/task_queues.js:97:5)...

    Yarn的工作机制.pptx

    RM确认后,释放所有占用的Container,标志着整个MapReduce作业的生命周期结束。 YARN通过分离资源管理和作业调度,实现了多用户、多任务的高效并发执行,提升了Hadoop集群的利用率和整体性能。此外,YARN的这种设计...

    【Yarn篇01】Yarn工作机制和作业提交流程1

    【Yarn工作机制和作业提交流程】是Hadoop生态系统中至关重要的一部分,它负责管理和调度分布式计算资源,确保高效地执行MapReduce等运算程序。Yarn,全称Yet Another Resource Negotiator,是一个资源调度平台,它的...

    一种基于YARN的高优先级作业调度实现方案

    YARN(Yet Another Resource Negotiator)是Apache Hadoop项目的一个子项目,其设计初衷是为了更好地管理大数据框架中的资源分配,以提升资源利用率和作业调度的效率。YARN通过引入资源管理器、节点管理器、应用程序...

    YARN.Essentials

    文档还通过实例演示了MRv1和MRv2的使用方法,包括输入文件的准备、作业的执行和结果的查看。 5. YARN的管理 这一部分详细介绍了YARN的容器管理,包括容器的分配和应用配置。深入讲解了YARN的调度策略,包括FIFO、...

    Yarn框架代码详细分析V0.3.pdf

    - 协调MapReduce作业的生命周期管理。 - 请求和管理资源以执行Map和Reduce任务。 - **实现细节:** - 通过与ResourceManager交互获取资源。 - 通过NodeManager启动和监控任务容器。 **2.2.4 MRYarnChild** - *...

    使用 Redis,Redict,Valkey,等进行 express-rate-limit 的速率限制存储 .zip

    rate-limit-redis redis中间件的存储 。...安装来自 npm 注册表# Using npm> npm install rate-limit-redis# Using yarn or pnpm> yarn/pnpm add rate-limit-redis来自 Github 发布# Using npm> npm install ...

    Yarn编程ApplicationList

    在这个主题中,我们将深入探讨"Yarn编程ApplicationList",包括如何实现应用列表的查询、应用的kill操作,以及Job的查询和map/reduce任务数量的查询。 首先,`Yarn编程ApplicationList`涉及到的主要概念是YARN的...

    Cloudera的yarn任务监控api

    AM通过与Resource Manager交互来请求资源,管理任务的生命周期,并监控任务进度。历史服务器则提供了对已完成作业的详细日志和性能指标的访问。 MR Application Master API允许开发者获取关于MapReduce作业的状态、...

Global site tag (gtag.js) - Google Analytics