`
x-rip
  • 浏览: 106931 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

YARN/MRv2 ResourceManager端 源码分析1

 
阅读更多

2. ResourceManager端

Client端通过YarnRunner.submitJob()将Application提交给了ResourceManager。

连接Client与ResourceManager的协议为ClientRMProtocol,该协议的实现类为ClientRMService。

1) ClientRMService.java

Client端与ResourceManager交互的所有操作最终都是由ClientRMService中的操作实现的。以submitApplication()为例。

 

		  public SubmitApplicationResponse submitApplication(
			  SubmitApplicationRequest request) throws YarnRemoteException {
			// 获取Application的相关信息,包括上下文、id、用户
			ApplicationSubmissionContext submissionContext = request
				.getApplicationSubmissionContext();
			ApplicationId applicationId = submissionContext.getApplicationId();
			String user = submissionContext.getUser();
			try {
			  user = UserGroupInformation.getCurrentUser().getShortUserName();
			  if (rmContext.getRMApps().get(applicationId) != null) {
				throw new IOException("Application with id " + applicationId
					+ " is already present! Cannot add a duplicate!");
			  }
			  submissionContext.setUser(user);
			// 通过上下文构造RMAppManagerSubmitEvent,并调用RMAppManger的handle方法进行处理
			  rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
				  .currentTimeMillis()));
			  LOG.info("Application with id " + applicationId.getId() + 
				  " submitted by user " + user + " with " + submissionContext);
			  RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
				  "ClientRMService", applicationId);
			} catch (IOException ie) {
			  ...
			}
			SubmitApplicationResponse response = recordFactory
				.newRecordInstance(SubmitApplicationResponse.class);
			return response;
		 }
 

 

  2) RMAppmanager.java

RMAppManager实现了EventHandler接口,代表该类是用于处理某种事件的

 

		  public void handle(RMAppManagerEvent event) {
			ApplicationId applicationId = event.getApplicationId();
			LOG.debug("RMAppManager processing event for " 
				+ applicationId + " of type " + event.getType());
			// 由event.getType()可以看出,该类用于处理Application的提交和完成事件
			switch(event.getType()) {
			  case APP_COMPLETED: 
			  {
				finishApplication(applicationId);
				ApplicationSummary.logAppSummary(
					rmContext.getRMApps().get(applicationId));
				checkAppNumCompletedLimit(); 
			  } 
			  break;
			  case APP_SUBMIT:
			  {
				ApplicationSubmissionContext submissionContext = 
					((RMAppManagerSubmitEvent)event).getSubmissionContext();
				long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
				// 这里调用了submitApplication函数去向ResourceManager提交Job
				submitApplication(submissionContext, submitTime);
			  }
			  break;
			  default:
				LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
			  }
		  }
		  protected synchronized void submitApplication(
			  ApplicationSubmissionContext submissionContext, long submitTime) {
			ApplicationId applicationId = submissionContext.getApplicationId();
			RMApp application = null;
			try {
			// 从传进来的context中获取Application的相关参数,并对没有赋值的参数添加默认值
			  String clientTokenStr = null;
			  ...
			  ...
			// 存储Application的相关信息用于在Application出错或者挂掉时恢复
			  ApplicationStore appStore = rmContext.getApplicationsStore()
				  .createApplicationStore(submissionContext.getApplicationId(),
				  submissionContext);
			// 创建ResourceManager用于封装Application的RMAppImpl对象
			  application = new RMAppImpl(applicationId, rmContext,
				  this.conf, submissionContext.getApplicationName(), user,
				  submissionContext.getQueue(), submissionContext, clientTokenStr,
				  appStore, this.scheduler,
				  this.masterService, submitTime);
			// 判断是否重复提交相同的Application
			  if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
				  null) {
				String message = "Application with id " + applicationId
					+ " is already present! Cannot add a duplicate!";
				LOG.info(message);
				throw RPCUtil.getRemoteException(message);
			  } 

			// 通知ACLsManager
			  this.applicationACLsManager.addApplication(applicationId,
				  submissionContext.getAMContainerSpec().getApplicationACLs());

			// 安全令牌
			  if (UserGroupInformation.isSecurityEnabled()) {
				this.rmContext.getDelegationTokenRenewer().addApplication(
					applicationId,parseCredentials(submissionContext),
					submissionContext.getCancelTokensWhenComplete()
					);
			  }      
			// 向AsyncDispatcher发送RMAppEventType.START事件,ApplicationEventDispatcher接到AsyncDispatcher分发来的事件并交由RMAppImpl处理
			  this.rmContext.getDispatcher().getEventHandler().handle(
				  new RMAppEvent(applicationId, RMAppEventType.START));
			} catch (IOException ie) {
				LOG.info("RMAppManager submit application exception", ie);
				if (application != null) {
				// 发送RMAppRejectedEvent事件
				  this.rmContext.getDispatcher().getEventHandler().handle(
					  new RMAppRejectedEvent(applicationId, ie.getMessage()));
				}
			}
		  }
 

 

3) EventHandler

自此AsyncDispatcher将接管之后所有的事件分发,所有事件都将由AsyncDispatcher分发给对应的EventDispatcher。EventDispatcher会初始化处理该事件的类,并将事件交给创建的类来进行处理。以RMAppEventType.START事件为例,该类将分发给ApplicationEventDispatcher,然后由ApplicationEventDispatcher初始化RMApp的实现类RMAppImpl来处理。

 

		  public static final class ApplicationEventDispatcher implements
			  EventHandler<RMAppEvent> {
			private final RMContext rmContext;
			public ApplicationEventDispatcher(RMContext rmContext) {
			  this.rmContext = rmContext;
			}
			@Override
			public void handle(RMAppEvent event) {
			  ApplicationId appID = event.getApplicationId();
			// 初始化处理对应事件的类
			  RMApp rmApp = this.rmContext.getRMApps().get(appID);
			  if (rmApp != null) {
				try {
				// 将事件交由对应类处理
				  rmApp.handle(event);
				} catch (Throwable t) {
				  LOG.error("Error in handling event type " + event.getType()
					  + " for application " + appID, t);
				}
			  }
			}
		  }
 

 

 RMAppImpl.java

 

			  public void handle(RMAppEvent event) {
				// 为更新状态机加锁
				this.writeLock.lock();
				try {
				  ApplicationId appID = event.getApplicationId();
				  LOG.debug("Processing event for " + appID + " of type "
					  + event.getType());
				  final RMAppState oldState = getState();
				  try {
					// 由状态机处理该事件
					this.stateMachine.doTransition(event.getType(), event);
				  } catch (InvalidStateTransitonException e) {
					...
				  }
				} finally {
				// 解锁
				  this.writeLock.unlock();
				}
			  }
 

 

状态机的工作方式如下,以RMAppImpl中的状态机为例

 

		/* 泛型参数从左到右依次为
			执行状态变换的类、封装状态的类、封装事件类型的类、封装事件的类
		   构造函数参数为该状态机的起始状态 */
		  private static final StateMachineFactory<RMAppImpl,
								   RMAppState,
								   RMAppEventType,
								   RMAppEvent> stateMachineFactory
					   = new StateMachineFactory<RMAppImpl,
								   RMAppState,
								   RMAppEventType,
								   RMAppEvent>(RMAppState.NEW)
						/* 添加状态之间的变换以及变换时的需要进行的操作的封装类
						   以下即表示状态从RMAppState.NEW -> RMAppState.SUBMITTED
						   的触发事件类型为RMAppEventType.START事件
						   需要执行的方法被封装在StartAppAttemptTransition类中 */
						.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
							RMAppEventType.START, new StartAppAttemptTransition())
						...
						...
						// 构建状态机
						.installTopology();
		// 封装执行状态变化所需的方法的类需要实现SingleArcTransition接口,以StartAppAttemptTransition为例
		// 每个状态机都会对应一个实现了SingleArcTransition接口的类,在这里为RMAppTransition
		// StartAppAttemptTransition通过继承RMAppTransition并实现transition方法,在该方法中实现状态变化的处理逻辑
		  private static final class StartAppAttemptTransition extends RMAppTransition {
			public void transition(RMAppImpl app, RMAppEvent event) {
			  app.createNewAttempt();
			};
		  }
 

 

至此,一个典型的由状态机分发事件并进行处理的相关类介绍完毕。总结如下:

(1) AsyncDispatcher根据接收到的事件按它的类分发给相应的EventDispatcher

(2) EventDispatcher初始化处理该类事件的类A,并将事件传递给A

(3) A调用状态机的doTransition方法确定该事件类型对应的状态变化和封装了需要执行的方法的类

(4) 实现了SingleArcTransition接口的类,将调用transition方法完成状态变换

由于状态机处理代码大都相同,以下将以事件为标题来描述状态变换和涉及的类和操作

4) RMAppEventType.START

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMAppImpl

状态更新: RMAppState.NEW -> RMAppState.SUBMITTED

所需操作: 创建RMAppAttemptImpl对象,初始化其状态为RMAppAttemptState.NEW

触发RMAppAttemptEventType.START事件

5) RMAppAttemptEventType.START

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.NEW -> RMAppAttemptState.SUBMITTED

所需操作: 向ApplicationMasterService注册该AppAttempt

触发AppAddedSchedulerEvent事件

6) AppAddedSchedulerEvent

EventDispatcher: SchedulerEventDispatcher

事件处理类: FifoScheduler/CapacityScheduler

所需操作: 创建SchedulerApp对象

触发RMAppAttemptEventType.APP_ACCEPTED事件

7) RMAppAttemptEventType.APP_ACCEPTED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.SUBMITTED -> RMAppAttemptState.SCHEDULED

所需操作: 调用ResourceScheduler的allocate函数,向ResourceManager申请运行                                                             ApplicationMaster需要的Container

触发RMAppEventType.APP_ACCEPTED事件

8) RMAppEventType.APP_ACCEPTED

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMAppImpl

状态更新: RMAppState.SUBMITTED -> RMAppState.ACCEPTED

9) 某个NodeManager向ResourceManager发送心跳

      10) ResourceManager的ResourceTrackerService收到心跳信息后触发封装了                                                            RMNodeEventType.STATUS_UPDATE的RMNodeStatusEvent事件

      11) RMNodeEventType.STATUS_UPDATE

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMNodeImpl

状态更新: RMNodeState.RUNNING -> RMNodeState.RUNNING

所需操作: 更新节点的健康状态

触发NodeUpdateSchedulerEvent事件

      12) NodeUpdateSchedulerEvent

EventDispatcher: SchedulerEventDispatcher

事件处理类: FifoScheduler/CapacityScheduler

所需操作: 创建SchedulerApp对象,调用assginContainers为该application分配一个                                                         container,此时还未真正分配

触发RMContainerEventType.START事件

      13) RMContainerEventType.START

EventDispatcher: NodeEventDispatcher

事件处理类: RMContainerImpl

状态更新: RMContainerState.NEW -> RMContainerState.ALLOCATED

所需操作: 触发RMAppAttemptContainerAllocatedEvent

                                                (RMAppAttemptEventType.CONTAINER_ALLOCATED)事件

      14) RMAppAttemptEventType.CONTAINER_ALLOCATED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作: 调用Scheduler的allocate函数申请一个container

触发AMLauncherEventType.LAUNCH事件

      15) AMLauncherEventType.LAUNCH

事件处理类: ApplicationMasterLauncher

状态更新: RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作: 创建AMLauncher对象,并将其添加到队列masterEvents中;

LauncherThread不断从masterEvents取出,进行处理,并调用

                                                AMLauncher.launch()函数;

AMLancher.launch()调用ContainerManager.startContainer()函数创建

                                                container;

同时触发RMAppAttemptEventType.LAUNCHED事件。

      16) RMAppAttemptEventType.LAUNCHED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.ALLOCATED -> RMAppAttemptState.LAUNCHED

所需操作: 向AMLivelinessMonitor注册,用于实时监控该Application的状态

      17) 第9)步中向ResourceManager发送心跳的NodeManager,调用

             AMRMProtocol.registerApplicationMaster()向ApplicationMasterService进行注册

      18) ApplicationMasterService.registerApplicationMaster()

从request中获取ApplicationAttempt的相关信息

触发RMAppAttemptEventType.REGISTERED事件

返回给调用端response

      19) RMAppAttemptEventType.REGISTERED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作: 设置Application注册后的信息,如运行的host、端口、TrackingURL等

触发RMAppEventType.ATTEMPT_REGISTERED事件

      20) 触发RMAppEventType.REGISTERED

EventDispatcher: ApplicationAttemptEventDispatcher

事件处理类: RMAppAttemptImpl

状态更新: RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作: 设置Application注册后的信息,如运行的host、端口、TrackingURL等

触发RMAppEventType.ATTEMPT_REGISTERED事件

      21) RMAppEventType.ATTEMPT_REGISTERED

EventDispatcher: ApplicationEventDispatcher

事件处理类: RMAppImpl

状态更新: RMAppState.ACCEPTED -> RMAppState.RUNNING   

至此,ApplicationMaster(MRAppMaster)创建完毕,之后Application的运行将由ApplicationMaster(MRAppMaster)接管,它将负责向ResourceManager申请运行子任务所需的资源,监控子任务的运行状态,并向ResourceManager汇报Application的运行状态


 

分享到:
评论

相关推荐

    YARN Essentials.PDF

    YARN 在 Hadoop 2.x 版本中被引入,取代了 Hadoop 1.x 中的 MapReduce v1 框架。 **1. 重新设计的理念:** - **解决 MapReduce v1 的局限性:** 原始的 MapReduce 设计仅限于支持 Map 和 Reduce 这两种操作,这限制...

    YARN.Essentials

    这里首先介绍了Hadoop 2.x的发布,对Hadoop 1.x及其MapReduce API的简要介绍,以及MRv1与MRv2的对比。接着,文档讨论了YARN在Hadoop中的作用,以及MapReduce API的向后兼容性和org.apache.hadoop.mapred API的二进制...

    Hadoop YARN 基本架构和发展趋势 - d.pdf

    Hadoop YARN(Yet Another Resource Negotiator)是Apache Hadoop项目中的一个核心组件,它的设计目标是解决早期Hadoop 1.x版本中MapReduce(MRv1)存在的问题,包括扩展性限制、单点故障以及对其他计算框架支持不足...

    Hadoop Reduce Join及基于MRV2 API 重写

    MRV2,也称为YARN(Yet Another Resource Negotiator),是Hadoop 2.x引入的新一代MapReduce框架,旨在提高资源管理和应用程序的灵活性。在MRV2中,MapReduce任务被分解为两个独立的组件:ApplicationMaster和...

    MapReduceV1和V2的API区别

    随着时间的推移,MapReduce经历了两次主要的版本升级,即MapReduce V1(MRv1)和MapReduce V2(MRv2,也称为YARN)。这两个版本在API设计和执行模型上有所不同,影响了开发人员的工作流程和系统性能。下面将详细讨论...

    HadoopYARN大数据计算框架及其资源调度机制研究

    为了解决这些问题,Hadoop 2.0版本引入了一种新的资源管理系统YARN,即MRv2。YARN是一种通用的资源调度体系,它支持多种计算框架(如MapReduce、Spark、Storm等),让它们能够在同一个集群上运行。YARN的核心是资源...

    Yarn知名培训私密资料

    在Hadoop 2.x版本中引入,YARN的目标是解决原MapReduce框架(MRv1)的不足,提高集群资源的利用率和整体性能。YARN通过将资源管理和任务调度功能分离,实现了更高效、灵活的集群资源分配。 【原MapReduce框架的不足...

    MapReduce技术揭秘.pptx

    YARN/MRv2架构中,ResourceManager负责全局资源管理和调度,而ApplicationMaster专注于任务调度和监视。此外,NodeManager在每个节点上管理任务执行,Container则作为资源分配的基本单位,考虑了CPU和内存等资源的...

    YARN 基础架构,工作机制,任务调度器

    YARN(Yet Another Resource Negotiator)是 Hadoop 2.0 中的资源管理系统,它的设计思想是将 MRv1 中的 JobTracker 拆分成两个独立的服务:一个全局的资源管理器 ResourceManager 和每个应用程序持有的 ...

    Hadoop Yarn详解

    在YARN出现之前,Hadoop主要依赖于MRv1(MapReduce版本1)架构来处理任务。MRv1架构将Hadoop集群分为两大抽象实体:MapReduce引擎和分布式文件系统HDFS。MapReduce引擎负责执行Map和Reduce任务并报告结果,而HDFS则...

    hadoop和yarn原理笔记.docx

    在大数据处理中,除了Hadoop MapReduce(MRv2)外,还有其他计算框架如Storm(实时流处理)、Spark(内存计算)和Impala(快速查询)。这些框架各有优势,如Spark通过内存计算大幅提高处理速度,而Impala利用自己的...

    Hadoop 体系架构.doc

    YARN(MRv2)是 Hadoop 的新架构,旨在解决 MRv1 的问题。YARN 采用了一种分层的集群框架方法,将特定于 MapReduce 的功能已替换为一组新的守护程序,将该框架向新的处理模型开放。 YARN 的新架构 YARN 的新架构...

    Hadoop体系架构.docx

    Hadoop 体系架构 Hadoop 体系架构是大数据处理的核心组件之一。...YARN 是 Hadoop 的新架构,旨在解决 MRv1 的问题,提供了一种分层的集群框架方法,能够支持 MRv2 和其他使用不同处理模型的应用程序。

    Hadoop 2.x单节点部署学习。

    此外,`yarn.nodemanager.aux-services`用于设置MapReduce所需的辅助服务,如MRv2 shuffle service。 `hdfs-site.xml`是HDFS(Hadoop Distributed File System)的配置文件,它定义了HDFS的行为,如副本数量、块...

    Hadoop-2.0中单点故障项目解决方案总结.doc

    最后,对于MapReduce本身的单点故障问题,Hadoop 2.0引入了两种MapReduce实现:MRv1和MRv2(又称YARN上的MapReduce)。在MRv1中,JobTracker是单点故障,但在MRv2中,JobTracker被替换为ApplicationMaster,每个应用...

    YARN框架原理及运行机制

    YARN是Hadoop2.0中的资源管理系统,它的设计思想是将MRv1中的JobTracker拆分成两个独立的服务:一个全局的资源管理器ResourceManager和每个应用程序持有的ApplicationMaster。其中RM负责整个系统的资源管理和分配,...

    hadoop2.7.5源码

    5. **MapReduce2(MRv2)**:Hadoop 2.x中的MapReduce改进版,与YARN紧密集成,提供了更好的资源管理和任务调度能力。 6. **Hadoop命令行工具**:源码中还包括用于操作HDFS和运行MapReduce作业的命令行工具,如`...

Global site tag (gtag.js) - Google Analytics