本节分析一个作业从开始运行到运行结束,所经历的整个过程,期间涉及到的各种事件和状态变化。
在正式讲解作业生命周期之前,先要了解MRAppMaster中作业表示方式,每个作业由若干干Map Task和Reduce Task组成,每个Task进一步由若干个TaskAttempt组成,Job、Task和TaskAttempt的生命周期均由一个状态机表示,具体可参考https://issues.apache.org/jira/browse/MAPREDUCE-279(附件中的图yarn-state-machine.job.png,yarn-state-machine.task.png和yarn-state-machine.task-attempt.png)
作业的创建入口在MRAppMaster类中,如下所示:
public class MRAppMaster extends CompositeService {
public void start() {
...
job = createJob(getConfig());//创建Job
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask
startJobs();//启动作业,这是后续一切动作的触发之源
...
}
protected Job createJob(Configuration conf) {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
}
(1)作业/任务初始化
JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和
ReduceTask,代码如下:
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
...
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
...
}
其中,createMapTasks函数实现如下:
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i &lt; job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens,
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
}
}
(2)作业启动
public class MRAppMaster extends CompositeService {
protected void startJobs() {
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
dispatcher.getEventHandler().handle(startJobEvent);
}
}
JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调度:
public static class StartTransition
implements SingleArcTransition&lt;JobImpl, JobEvent&gt; {
public void transition(JobImpl job, JobEvent event) {
job.scheduleTasks(job.mapTasks);
job.scheduleTasks(job.reduceTasks);
}
}
这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的任务所经历的状态变化如下(不包含失败或者被杀死情况):
【总结】
本文分析只是起到抛砖引入的作用,读者如果感兴趣,可以自行更深入的研究以下内容:
(1)Job、Task和TaskAttempt状态机设计(分别在JobImpl、TaskImpl和TaskAttemptImpl中)
(2)在以下几种场景下,以上三个状态机的涉及到的变化:
1) kill job
2) kill task attempt
3) fail task attempt
4) container failed
5) lose node
分享到:
相关推荐
YARN(MRv2)搭建
yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz
Apache Flink是一个开源的分布式流处理框架,用于在...通过这些资源,用户可以深入了解Flink在分布式环境中的运行机制,掌握在YARN/K8S平台上部署Flink集群的最佳实践,并对Flink的核心概念和技术细节有更全面的认识。
脚本使用:vim编辑脚本,按照自己的配置修改主机号,我的是hadoop1、2是NN;hadoop2、3是Spark Master;hadoop3还是RM;hadoop4、5、6是DN、NM、Spark Worker。编辑完成后在满足“前提”的任意一台主机运行均可。 ...
Yarn 的使用方法基本与 npm 类似,比如 `yarn init` 创建新项目,`yarn add` 添加依赖,`yarn remove` 移除依赖,`yarn upgrade` 升级依赖,`yarn install` 安装项目依赖等。然而,由于 Yarn 的特性,这些操作的执行...
- **Application Master (AM)**:负责整个应用的生命周期管理,包括向YARN申请资源、启动容器等。 - **Application Worker**:执行实际任务的组件,不是所有应用都需要实现这一部分。 #### 5. YARN开发中的主要协议...
MRAppMaster负责管理MapReduce作业的生命周期,包括任务的调度、执行和监控等。它与ResourceManager交互,请求和释放执行任务所需的容器资源。 YARN框架还包括JobHistoryServer,它用于记录作业运行历史,提供作业...
2. 节点管理器(NodeManager, NM):每个节点上的守护进程,负责该节点上的资源使用情况,以及监控容器(Container)的生命周期。 3. 应用程序主节点(ApplicationMaster, AM):负责管理运行在YARN上的每一个应用...
- **MapReduce v2 (MRv2):** 作为 YARN 的一部分,支持新的 API,提高了灵活性和性能。 **2. MapReduce v1 与 MapReduce v2 的对比:** - **API 变更:** MRv2 提供了新的 API,允许开发者编写更复杂的应用程序。 ...
TypeError: self.env.emit is not a function at /usr/local/share/.config/yarn/global/node_modules/yeoman-generator/lib/index.js:653:22 at processTicksAndRejections (internal/process/task_queues.js:97:5)...
RM确认后,释放所有占用的Container,标志着整个MapReduce作业的生命周期结束。 YARN通过分离资源管理和作业调度,实现了多用户、多任务的高效并发执行,提升了Hadoop集群的利用率和整体性能。此外,YARN的这种设计...
【Yarn工作机制和作业提交流程】是Hadoop生态系统中至关重要的一部分,它负责管理和调度分布式计算资源,确保高效地执行MapReduce等运算程序。Yarn,全称Yet Another Resource Negotiator,是一个资源调度平台,它的...
YARN(Yet Another Resource Negotiator)是Apache Hadoop项目的一个子项目,其设计初衷是为了更好地管理大数据框架中的资源分配,以提升资源利用率和作业调度的效率。YARN通过引入资源管理器、节点管理器、应用程序...
文档还通过实例演示了MRv1和MRv2的使用方法,包括输入文件的准备、作业的执行和结果的查看。 5. YARN的管理 这一部分详细介绍了YARN的容器管理,包括容器的分配和应用配置。深入讲解了YARN的调度策略,包括FIFO、...
- 协调MapReduce作业的生命周期管理。 - 请求和管理资源以执行Map和Reduce任务。 - **实现细节:** - 通过与ResourceManager交互获取资源。 - 通过NodeManager启动和监控任务容器。 **2.2.4 MRYarnChild** - *...
rate-limit-redis redis中间件的存储 。...安装来自 npm 注册表# Using npm> npm install rate-limit-redis# Using yarn or pnpm> yarn/pnpm add rate-limit-redis来自 Github 发布# Using npm> npm install ...
在这个主题中,我们将深入探讨"Yarn编程ApplicationList",包括如何实现应用列表的查询、应用的kill操作,以及Job的查询和map/reduce任务数量的查询。 首先,`Yarn编程ApplicationList`涉及到的主要概念是YARN的...
AM通过与Resource Manager交互来请求资源,管理任务的生命周期,并监控任务进度。历史服务器则提供了对已完成作业的详细日志和性能指标的访问。 MR Application Master API允许开发者获取关于MapReduce作业的状态、...