1. 写在前面
正如前几篇文章所述,YARN/MRv2是一个资源统一管理系统,它上面可以运行各种计算框架,而所有计算框架的client端编写方法类似,本文拟以MapReduce计算框架的client端代码为例进行说明。
2. 两个相关协议
需要通过两个协议提交作业:
ClientProtocol:Hadoop中的JobClient通过该协议向JobTracker提交作业
ClientRMProtocol:Yarn中的client通过该协议向ResourceManager提交作业。
3. Client设计方法
为了使Hadoop MapReduce无缝迁移到Yarn中,需要在client端同时使用这两个协议,采用的方法是:
【继承+组合的设计模式】
设计新类YARNRunner,实现ClientProtocol接口,并将ClientRMProtocol对象作为内部成员。当用户提交作业 时,会直接调用YARNRunner中的submitJob函数,在该函数内部,会接调用ClientRMProtocol的 submitApplication函数,将作业提交到ResourceManager中。此处的submitApplication函数实际上是一个 RPC函数,由ResourceManager实现。
我们看一下ClientRMProtocol接口中的所有方法:
public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnRemoteException;
向ResourceManager提交新的application,client调用该函数时,需要在参数request中指定application所在队列,ApplicationMaster相关jar包及启动方法等信息。
public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnRemoteException;
client要求ResourceManager杀死某个application。
public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnRemoteException;
client通过该函数向ResourceManager查询某个application的信息,如id,user,time等信息。
4. 整个流程分析
Client首先通过ClientRMProtocal#getNewApplication获取一个新的“ApplicationId”,然后使 用ClientRMProtocal#submitApplication提交一个application,当调用 ClientRMProtocal#submitApplication时 ,需要向Resource Manager提供足够的信息以便启动第一个container(实际上就是Application Master)。Client需要提供足够的细节信息,如运行application需要的文件和jar包,执行这些jar包需要的命令,一些unix环 境设置等。
这之后,Resource Manager会首先申请一个container,并在它里面启动ApplicationMaster,之后ApplicationMaster会通过 AMRMProtocal和ContainerManager分别与Resource Manager和Node Manager通信进行资源申请和container启动。
具体细节:
(1) Client向Resource Manager发动一个连接,更具体 一些,实际上是向ResourceManager的ApplicationsManager发动一个连接。
YarnRPC rpc = YarnRPC.create(this.conf); InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.conf.get( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS), YarnConfiguration.DEFAULT_RM_PORT, YarnConfiguration.RM_ADDRESS); LOG.info("Connecting to ResourceManager at " + rmAddress); applicationsManager = (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, this.conf);
(2) 一旦获取一个连接到ASM的handler,client要求ResourceManager分配一个新的ApplicationId。
SubmitApplicationRequest request = recordFactory.newRecordInstance(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); applicationsManager.submitApplication(request); LOG.info("Submitted application " + applicationId + " to ResourceManager");
(3) ASM返回的response中也包含cluster的信息,如该cluster中最少/最大可用资源量,这可以帮助我们合理的设置Application Master需要的资源量,关于更多细节,可查看GetNewApplicationResponse类。
Client最重要的任务是设置对象ApplicationSubmissionContext,它定义了ResourceManager启动ApplicationMaster所需的全部信息。Client需要在该context中设置一下信息:
[1] 队列,优先级信息:该application将要提交到哪个队列,以及它的优先级是多少。
[2] 用户:哪个用户提交的application,这主要用于权限管理。
[3] ContainerLaunchContext:启动并运行ApplicationMaster的那个container的相关信息,包括:本地资源 (binaries,jars,files等),安全令牌(security tokens),环境变量设置(CLASSPATH等)和运行命令。
// Create a new ApplicationSubmissionContext ApplicationSubmissionContext appContext = Records.newRecord ( ApplicationSubmissionContext . class ) ; // set the ApplicationId appContext.setApplicationId ( appId ) ; // set the application name appContext.setApplicationName ( appName ) ; // Create a new container launch context for the AM'scontainer ContainerLaunchContext amContainer = Records.newRecord ( ContainerLaunchContext . class ) ; // Define the local resources required Map < String , LocalResource > localResources = new HashMap < String , LocalResource > ( ) ; // Lets assume the jar we need for our ApplicationMaster is available in // HDFS at a certain known path to us and we want to make it available to // the ApplicationMaster in the launched container Path jarPath ; // <- known path to jar file FileStatus jarStatus = fs.getFileStatus ( jarPath ) ; LocalResource amJarRsrc = Records.newRecord ( LocalResource . class ) ; // Set the type of resource - file or archive // archives are untarred at the destination by the framework amJarRsrc.setType ( LocalResourceType.FILE ) ; // Set visibility of the resource // Setting to most private option i.e. this file will only // be visible to this instance of the running application amJarRsrc.setVisibility ( LocalResourceVisibility . APPLICATION ) ; // Set the location of resource to be copied over into the // working directory amJarRsrc.setResource ( ConverterUtils . getYarnUrlFromPath ( jarPath ) ) ; // Set timestamp and length of file so that the framework // can do basic sanity checks for the local resource // after it has been copied over to ensure it is the same // resource the client intended to use with the application amJarRsrc.setTimestamp ( jarStatus . getModificationTime ( ) ) ; amJarRsrc.setSize ( jarStatus . getLen ( ) ) ; // The framework will create a symlink called AppMaster.jar in the // working directory that will be linked back to the actual file. // The ApplicationMaster, if needs to reference the jar file, would // need to use the symlink filename. localResources.put ( "AppMaster.jar" , amJarRsrc ) ; // Set the local resources into the launch context amContainer.setLocalResources ( localResources ) ; // Set up the environment needed for the launch context Map < String , String > env = new HashMap < String , String > ( ) ; // For example, we could setup the classpath needed. // Assuming our classes or jars are available as local resources in the // working directory from which the command will be run, we need toappend // "." to the path. // By default, all the hadoop specific classpaths will already be available // in $CLASSPATH, so we should be careful not to overwrite it. String classPathEnv = "$CLASSPATH:./*:" ; env . put ( "CLASSPATH" , classPathEnv ) ; amContainer . setEnvironment ( env ) ; // Construct the command to be executed on the launched container String command = "${JAVA_HOME}" + / bin / java " + " MyAppMaster" + " arg1 arg2 arg3" + " 1>" + ApplicationConstants . LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants . LOG_DIR_EXPANSION_VAR + "/stderr" ; List < String > commands = new ArrayList < String > ( ) ; commands.add ( command ) ; // add additional commands if needed // Set the command array into the container spec amContainer.setCommands ( commands ) ; // Define the resource requirements for the container // For now, YARN only supports memory so we set the memory // requirements. //If the process takes more than its allocated memory, it will // be killed by the framework. // Memory being requested for should be less than max capability // of the cluster and all asks should be a multiple of the min capability. Resource capability = Records . newRecord ( Resource . class ) ; capability.setMemory ( amMemory ) ; amContainer.setResource ( capability ) ; // Set the container launch content into the ApplicationSubmissionContext appContext.setAMContainerSpec ( amContainer ) ;
(4) 这之后client可以向ASM提交application:
// Create the request to send to the ApplicationsManager SubmitApplicationRequest appRequest = Records.newRecord ( SubmitApplicationRequest . class ) ; appRequest.setApplicationSubmissionContext ( appContext ) ; // Submit the application to the ApplicationsManager // Ignore the response as either a valid response object is returned on //success or an exception thrown to denote the failure applicationsManager. submitApplication ( appRequest ) ;
(4) 到此为止,ResourceManager应该已经接受该application,并根据资源需求分配一个container,最终在分配的 container中启动ApplicationMaster。Client有多种方法跟踪实际任务的进度:可以使用 ClientRMProtocal#getApplicationReport与ResourceManager通信以获取application执行当 前情况报告。
GetApplicationReportRequest request = recordFactory .newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId); GetApplicationReportResponse response = applicationsManager .getApplicationReport(request); ApplicationReport applicationReport = response.getApplicationReport();
从ResourceManager中获取的ApplicationReport包含以下内容:
[1] 一般的application信息,如:ApplicationId,application所在队列,application对应用户等
[2] ApplicationMaster信息:ApplicationMaster所在的host,接收用户请求的rpc port以及client与ApplicationMaster通信需要的token等。
[3] 追踪Application的相关信息:如果application支持进度追踪,可以设置一个tracking url,通过该url,client可以直接获取进度。
[4] ApplicationStatus:client通过ApplicationReport#getYarnApplicationState可从 ResourceManager那获取application的当前状态,如果ApplicationState为FINISHED,client需要调 用ApplicationReport#getFinalApplicationStatus检查application运行成功或者失败,如果运行失 败,可调用ApplicationReport#getDiagnostics获取application失败的详细信息。
[5] 如果ApplicationMaster支持,client可直接通过host:rpcport向ApplicationMaster查询其执行进度。当然,也可以使用上面提到的tracking url。
相关推荐
YARN(MRv2)搭建
yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:150) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56) ... Caused by...
赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom; 包含翻译后的API文档:hadoop-yarn-client-2.6.5-javadoc-API文档-中文(简体)版.zip; Maven坐标:org...
flink-hadoop-compatibility_2.12-1.7.1.jar javax.ws.rs-api-2.0.1.jar jersey-common-2.27.jar ...Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
赠送源代码:hadoop-yarn-client-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.7.3.pom; 包含翻译后的API文档:hadoop-yarn-client-2.7.3-javadoc-API文档-中文(简体)-英语-对照版.zip; ...
赠送源代码:hadoop-yarn-client-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.5.1.pom; 包含翻译后的API文档:hadoop-yarn-client-2.5.1-javadoc-API文档-中文(简体)-英语-对照版.zip; ...
2. RM调度器:包括调度器的简述和FairScheduler的代码分析、资源预分配、抢占资源和container分配等细节。FairScheduler是YARN中一种支持公平调度的资源调度器,它能够按照公平原则合理地分配资源,使得集群中的所有...
赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom; 包含翻译后的API文档:hadoop-yarn-client-2.6.5-javadoc-API文档-中文(简体)-英语-对照版.zip; ...
Hadoop的2.0版本的yarn的框架介绍啊 Hadoop yarnYARN 本身框架的优势是扩展性与支持多计算模型。对于扩展性目前主要体现在计算节点规模上,以前 JobTracker-TaskTracker 模型下最多大约在 5000 台机器左右,对于 ...
YARN的WEB框架解析包括了对YARN WEB界面代码的分析以及它提供的一些功能,如集群信息的查看、资源使用情况的监控等。 总之,YARN作为Hadoop生态系统中的重要组件,通过它的设计和代码实现了对大规模分布式计算资源...
这段代码创建了一个`YarnClient`实例,发送一个请求获取所有应用程序,然后遍历并打印出每个应用的ID。 通过理解YARN的工作机制和提供的API,开发者可以构建强大的工具来管理和监控Hadoop集群上的应用程序,确保...
2. **锁定文件**:Yarn 使用 `yarn.lock` 文件来确保所有开发者的环境中安装的包版本一致,解决了 npm 的版本漂移问题,提高了项目的可预测性和可重复性。 3. **并行下载**:Yarn 能够并行下载依赖包,显著提升了...
在YARN/K8S上部署Flink,意味着Flink应用可以无缝接入大数据生态系统,并利用YARN/K8S的资源管理能力,实现资源的有效调度和高可用性部署。同时,这种集成也为大数据处理提供了更加灵活的运行环境,用户可以根据自己...
安装从PyPI pip install yarn-api-client从水蟒(conda forge) conda install -c conda-forge yarn-api-client从源代码pip install git+https://github.com/CODAIT/hadoop-yarn-api-python-client.git启
本文对 Yarn 获取 Application 列表编码进行了详细的分析和介绍,包括其实现原理、代码实现、优点和应用场景。使用 Yarn 获取 Application 列表编码可以实时监控和管理 Hadoop 集群中的应用程序,从而提高大数据处理...
赠送源代码:hadoop-yarn-client-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.5.1.pom; 包含翻译后的API文档:hadoop-yarn-client-2.5.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org...
在 Spark on YARN 中,Client 端的 DriverActor 通过 YARN 的 ResourceManager 注册任务,ResourceManager 负责将任务调度到合适的 NodeManager 上执行。NodeManager 负责执行任务,并将执行结果返回给 DriverActor...
Yarn 对你的代码来说是一个包管理器, 你可以通过它使用全世界开发者的代码,或者分享自己的代码。 Yarn 做这些快捷、安全、可靠,所以你不用担心什么。 通过Yarn你可以使用其他开发者针对不同问题的解决方案,使...