2. ResourceManager端
Client端通过YarnRunner.submitJob()将Application提交给了ResourceManager。
连接Client与ResourceManager的协议为ClientRMProtocol,该协议的实现类为ClientRMService。
1) ClientRMService.java
Client端与ResourceManager交互的所有操作最终都是由ClientRMService中的操作实现的。以submitApplication()为例。
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnRemoteException {
// 获取Application的相关信息,包括上下文、id、用户
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
String user = submissionContext.getUser();
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
if (rmContext.getRMApps().get(applicationId) != null) {
throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!");
}
submissionContext.setUser(user);
// 通过上下文构造RMAppManagerSubmitEvent,并调用RMAppManger的handle方法进行处理
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user + " with " + submissionContext);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (IOException ie) {
...
}
SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
2) RMAppmanager.java
RMAppManager实现了EventHandler接口,代表该类是用于处理某种事件的
public void handle(RMAppManagerEvent event) {
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
// 由event.getType()可以看出,该类用于处理Application的提交和完成事件
switch(event.getType()) {
case APP_COMPLETED:
{
finishApplication(applicationId);
ApplicationSummary.logAppSummary(
rmContext.getRMApps().get(applicationId));
checkAppNumCompletedLimit();
}
break;
case APP_SUBMIT:
{
ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
// 这里调用了submitApplication函数去向ResourceManager提交Job
submitApplication(submissionContext, submitTime);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
protected synchronized void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
// 从传进来的context中获取Application的相关参数,并对没有赋值的参数添加默认值
String clientTokenStr = null;
...
...
// 存储Application的相关信息用于在Application出错或者挂掉时恢复
ApplicationStore appStore = rmContext.getApplicationsStore()
.createApplicationStore(submissionContext.getApplicationId(),
submissionContext);
// 创建ResourceManager用于封装Application的RMAppImpl对象
application = new RMAppImpl(applicationId, rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, clientTokenStr,
appStore, this.scheduler,
this.masterService, submitTime);
// 判断是否重复提交相同的Application
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.info(message);
throw RPCUtil.getRemoteException(message);
}
// 通知ACLsManager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
// 安全令牌
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId,parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete()
);
}
// 向AsyncDispatcher发送RMAppEventType.START事件,ApplicationEventDispatcher接到AsyncDispatcher分发来的事件并交由RMAppImpl处理
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START));
} catch (IOException ie) {
LOG.info("RMAppManager submit application exception", ie);
if (application != null) {
// 发送RMAppRejectedEvent事件
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
}
}
}
3) EventHandler
自此AsyncDispatcher将接管之后所有的事件分发,所有事件都将由AsyncDispatcher分发给对应的EventDispatcher。EventDispatcher会初始化处理该事件的类,并将事件交给创建的类来进行处理。以RMAppEventType.START事件为例,该类将分发给ApplicationEventDispatcher,然后由ApplicationEventDispatcher初始化RMApp的实现类RMAppImpl来处理。
public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
private final RMContext rmContext;
public ApplicationEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void handle(RMAppEvent event) {
ApplicationId appID = event.getApplicationId();
// 初始化处理对应事件的类
RMApp rmApp = this.rmContext.getRMApps().get(appID);
if (rmApp != null) {
try {
// 将事件交由对应类处理
rmApp.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appID, t);
}
}
}
}
RMAppImpl.java
public void handle(RMAppEvent event) {
// 为更新状态机加锁
this.writeLock.lock();
try {
ApplicationId appID = event.getApplicationId();
LOG.debug("Processing event for " + appID + " of type "
+ event.getType());
final RMAppState oldState = getState();
try {
// 由状态机处理该事件
this.stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
...
}
} finally {
// 解锁
this.writeLock.unlock();
}
}
状态机的工作方式如下,以RMAppImpl中的状态机为例
/* 泛型参数从左到右依次为
执行状态变换的类、封装状态的类、封装事件类型的类、封装事件的类
构造函数参数为该状态机的起始状态 */
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
/* 添加状态之间的变换以及变换时的需要进行的操作的封装类
以下即表示状态从RMAppState.NEW -> RMAppState.SUBMITTED
的触发事件类型为RMAppEventType.START事件
需要执行的方法被封装在StartAppAttemptTransition类中 */
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
RMAppEventType.START, new StartAppAttemptTransition())
...
...
// 构建状态机
.installTopology();
// 封装执行状态变化所需的方法的类需要实现SingleArcTransition接口,以StartAppAttemptTransition为例
// 每个状态机都会对应一个实现了SingleArcTransition接口的类,在这里为RMAppTransition
// StartAppAttemptTransition通过继承RMAppTransition并实现transition方法,在该方法中实现状态变化的处理逻辑
private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
app.createNewAttempt();
};
}
至此,一个典型的由状态机分发事件并进行处理的相关类介绍完毕。总结如下:
(1) AsyncDispatcher根据接收到的事件按它的类分发给相应的EventDispatcher
(2) EventDispatcher初始化处理该类事件的类A,并将事件传递给A
(3) A调用状态机的doTransition方法确定该事件类型对应的状态变化和封装了需要执行的方法的类
(4) 实现了SingleArcTransition接口的类,将调用transition方法完成状态变换
由于状态机处理代码大都相同,以下将以事件为标题来描述状态变换和涉及的类和操作
4) RMAppEventType.START
EventDispatcher: ApplicationEventDispatcher
事件处理类: RMAppImpl
状态更新: RMAppState.NEW -> RMAppState.SUBMITTED
所需操作: 创建RMAppAttemptImpl对象,初始化其状态为RMAppAttemptState.NEW
触发RMAppAttemptEventType.START事件
5) RMAppAttemptEventType.START
EventDispatcher: ApplicationAttemptEventDispatcher
事件处理类: RMAppAttemptImpl
状态更新: RMAppAttemptState.NEW -> RMAppAttemptState.SUBMITTED
所需操作: 向ApplicationMasterService注册该AppAttempt
触发AppAddedSchedulerEvent事件
6) AppAddedSchedulerEvent
EventDispatcher: SchedulerEventDispatcher
事件处理类: FifoScheduler/CapacityScheduler
所需操作: 创建SchedulerApp对象
触发RMAppAttemptEventType.APP_ACCEPTED事件
7) RMAppAttemptEventType.APP_ACCEPTED
EventDispatcher: ApplicationAttemptEventDispatcher
事件处理类: RMAppAttemptImpl
状态更新: RMAppAttemptState.SUBMITTED -> RMAppAttemptState.SCHEDULED
所需操作: 调用ResourceScheduler的allocate函数,向ResourceManager申请运行 ApplicationMaster需要的Container
触发RMAppEventType.APP_ACCEPTED事件
8) RMAppEventType.APP_ACCEPTED
EventDispatcher: ApplicationEventDispatcher
事件处理类: RMAppImpl
状态更新: RMAppState.SUBMITTED -> RMAppState.ACCEPTED
9) 某个NodeManager向ResourceManager发送心跳
10) ResourceManager的ResourceTrackerService收到心跳信息后触发封装了 RMNodeEventType.STATUS_UPDATE的RMNodeStatusEvent事件
11) RMNodeEventType.STATUS_UPDATE
EventDispatcher: ApplicationEventDispatcher
事件处理类: RMNodeImpl
状态更新: RMNodeState.RUNNING -> RMNodeState.RUNNING
所需操作: 更新节点的健康状态
触发NodeUpdateSchedulerEvent事件
12) NodeUpdateSchedulerEvent
EventDispatcher: SchedulerEventDispatcher
事件处理类: FifoScheduler/CapacityScheduler
所需操作: 创建SchedulerApp对象,调用assginContainers为该application分配一个 container,此时还未真正分配
触发RMContainerEventType.START事件
13) RMContainerEventType.START
EventDispatcher: NodeEventDispatcher
事件处理类: RMContainerImpl
状态更新: RMContainerState.NEW -> RMContainerState.ALLOCATED
所需操作: 触发RMAppAttemptContainerAllocatedEvent
(RMAppAttemptEventType.CONTAINER_ALLOCATED)事件
14) RMAppAttemptEventType.CONTAINER_ALLOCATED
EventDispatcher: ApplicationAttemptEventDispatcher
事件处理类: RMAppAttemptImpl
状态更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED
所需操作: 调用Scheduler的allocate函数申请一个container
触发AMLauncherEventType.LAUNCH事件
15) AMLauncherEventType.LAUNCH
事件处理类: ApplicationMasterLauncher
状态更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED
所需操作: 创建AMLauncher对象,并将其添加到队列masterEvents中;
LauncherThread不断从masterEvents取出,进行处理,并调用
AMLauncher.launch()函数;
AMLancher.launch()调用ContainerManager.startContainer()函数创建
container;
同时触发RMAppAttemptEventType.LAUNCHED事件。
16) RMAppAttemptEventType.LAUNCHED
EventDispatcher: ApplicationAttemptEventDispatcher
事件处理类: RMAppAttemptImpl
状态更新: RMAppAttemptState.ALLOCATED -> RMAppAttemptState.LAUNCHED
所需操作: 向AMLivelinessMonitor注册,用于实时监控该Application的状态
17) 第9)步中向ResourceManager发送心跳的NodeManager,调用
AMRMProtocol.registerApplicationMaster()向ApplicationMasterService进行注册
18) ApplicationMasterService.registerApplicationMaster()
从request中获取ApplicationAttempt的相关信息
触发RMAppAttemptEventType.REGISTERED事件
返回给调用端response
19) RMAppAttemptEventType.REGISTERED
EventDispatcher: ApplicationAttemptEventDispatcher
事件处理类: RMAppAttemptImpl
状态更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING
所需操作: 设置Application注册后的信息,如运行的host、端口、TrackingURL等
触发RMAppEventType.ATTEMPT_REGISTERED事件
20) 触发RMAppEventType.REGISTERED
EventDispatcher: ApplicationAttemptEventDispatcher
事件处理类: RMAppAttemptImpl
状态更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING
所需操作: 设置Application注册后的信息,如运行的host、端口、TrackingURL等
触发RMAppEventType.ATTEMPT_REGISTERED事件
21) RMAppEventType.ATTEMPT_REGISTERED
EventDispatcher: ApplicationEventDispatcher
事件处理类: RMAppImpl
状态更新: RMAppState.ACCEPTED -> RMAppState.RUNNING
至此,ApplicationMaster(MRAppMaster)创建完毕,之后Application的运行将由ApplicationMaster(MRAppMaster)接管,它将负责向ResourceManager申请运行子任务所需的资源,监控子任务的运行状态,并向ResourceManager汇报Application的运行状态
分享到:
相关推荐
YARN 在 Hadoop 2.x 版本中被引入,取代了 Hadoop 1.x 中的 MapReduce v1 框架。 **1. 重新设计的理念:** - **解决 MapReduce v1 的局限性:** 原始的 MapReduce 设计仅限于支持 Map 和 Reduce 这两种操作,这限制...
这里首先介绍了Hadoop 2.x的发布,对Hadoop 1.x及其MapReduce API的简要介绍,以及MRv1与MRv2的对比。接着,文档讨论了YARN在Hadoop中的作用,以及MapReduce API的向后兼容性和org.apache.hadoop.mapred API的二进制...
Hadoop YARN(Yet Another Resource Negotiator)是Apache Hadoop项目中的一个核心组件,它的设计目标是解决早期Hadoop 1.x版本中MapReduce(MRv1)存在的问题,包括扩展性限制、单点故障以及对其他计算框架支持不足...
MRV2,也称为YARN(Yet Another Resource Negotiator),是Hadoop 2.x引入的新一代MapReduce框架,旨在提高资源管理和应用程序的灵活性。在MRV2中,MapReduce任务被分解为两个独立的组件:ApplicationMaster和...
随着时间的推移,MapReduce经历了两次主要的版本升级,即MapReduce V1(MRv1)和MapReduce V2(MRv2,也称为YARN)。这两个版本在API设计和执行模型上有所不同,影响了开发人员的工作流程和系统性能。下面将详细讨论...
为了解决这些问题,Hadoop 2.0版本引入了一种新的资源管理系统YARN,即MRv2。YARN是一种通用的资源调度体系,它支持多种计算框架(如MapReduce、Spark、Storm等),让它们能够在同一个集群上运行。YARN的核心是资源...
在Hadoop 2.x版本中引入,YARN的目标是解决原MapReduce框架(MRv1)的不足,提高集群资源的利用率和整体性能。YARN通过将资源管理和任务调度功能分离,实现了更高效、灵活的集群资源分配。 【原MapReduce框架的不足...
YARN/MRv2架构中,ResourceManager负责全局资源管理和调度,而ApplicationMaster专注于任务调度和监视。此外,NodeManager在每个节点上管理任务执行,Container则作为资源分配的基本单位,考虑了CPU和内存等资源的...
YARN(Yet Another Resource Negotiator)是 Hadoop 2.0 中的资源管理系统,它的设计思想是将 MRv1 中的 JobTracker 拆分成两个独立的服务:一个全局的资源管理器 ResourceManager 和每个应用程序持有的 ...
在YARN出现之前,Hadoop主要依赖于MRv1(MapReduce版本1)架构来处理任务。MRv1架构将Hadoop集群分为两大抽象实体:MapReduce引擎和分布式文件系统HDFS。MapReduce引擎负责执行Map和Reduce任务并报告结果,而HDFS则...
在大数据处理中,除了Hadoop MapReduce(MRv2)外,还有其他计算框架如Storm(实时流处理)、Spark(内存计算)和Impala(快速查询)。这些框架各有优势,如Spark通过内存计算大幅提高处理速度,而Impala利用自己的...
YARN(MRv2)是 Hadoop 的新架构,旨在解决 MRv1 的问题。YARN 采用了一种分层的集群框架方法,将特定于 MapReduce 的功能已替换为一组新的守护程序,将该框架向新的处理模型开放。 YARN 的新架构 YARN 的新架构...
Hadoop 体系架构 Hadoop 体系架构是大数据处理的核心组件之一。...YARN 是 Hadoop 的新架构,旨在解决 MRv1 的问题,提供了一种分层的集群框架方法,能够支持 MRv2 和其他使用不同处理模型的应用程序。
此外,`yarn.nodemanager.aux-services`用于设置MapReduce所需的辅助服务,如MRv2 shuffle service。 `hdfs-site.xml`是HDFS(Hadoop Distributed File System)的配置文件,它定义了HDFS的行为,如副本数量、块...
最后,对于MapReduce本身的单点故障问题,Hadoop 2.0引入了两种MapReduce实现:MRv1和MRv2(又称YARN上的MapReduce)。在MRv1中,JobTracker是单点故障,但在MRv2中,JobTracker被替换为ApplicationMaster,每个应用...
YARN是Hadoop2.0中的资源管理系统,它的设计思想是将MRv1中的JobTracker拆分成两个独立的服务:一个全局的资源管理器ResourceManager和每个应用程序持有的ApplicationMaster。其中RM负责整个系统的资源管理和分配,...
5. **MapReduce2(MRv2)**:Hadoop 2.x中的MapReduce改进版,与YARN紧密集成,提供了更好的资源管理和任务调度能力。 6. **Hadoop命令行工具**:源码中还包括用于操作HDFS和运行MapReduce作业的命令行工具,如`...