`
x-rip
  • 浏览: 106918 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

YARN/MRv2 Client端 源码分析

 
阅读更多

 

1. 客户端

1)Job.java

用户编辑好的MapReduce程序会通过Job.waitForCompletion(true)提交任务。

 

		public boolean waitForCompletion(boolean verbose
									   ) throws IOException, InterruptedException,
												ClassNotFoundException {
		// 通过submit()方法提交Job
		if (state == JobState.DEFINE) {
		  submit();
		}
		if (verbose) {
		//在循环中不断得到此任务的状态,并打印到客户端的Console中
		  monitorAndPrintJob();
		} else {
			...
		}
			return isSuccessful();
		}
		public void submit() 
			 throws IOException, InterruptedException, ClassNotFoundException {
			// 确认Job状态
			ensureState(JobState.DEFINE);
			// 使用YARN的API
			setUseNewAPI();
			// 初始化用于获取MapReduce程序状态的Cluster,该类会去配置文件中加载是将job提交到Yarn还是JobTracker中
			connect();
			// 构建用于提交Job的JobSubmitter
			final JobSubmitter submitter = 
				getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
			status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
			  public JobStatus run() throws IOException, InterruptedException, 
			  ClassNotFoundException {
				return submitter.submitJobInternal(Job.this, cluster);
			  }
			});
			// 更新Job状态
			state = JobState.RUNNING;
			LOG.info("The url to track the job: " + getTrackingURL());
	   }
 

2) JobSubmitter.java

由Job提交的任务将由JobSubmitter接管,在提交前进行一些检查和准备工作

 

		JobStatus submitJobInternal(Job job, Cluster cluster) 
			throws ClassNotFoundException, InterruptedException, IOException {
			// 检查Job的output格式是否符合要求
			checkSpecs(job);
			Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
															 job.getConfiguration());
			// 确认提交Job时使用的参数是否正确,比如运行该Job的host等
			Configuration conf = job.getConfiguration();
			InetAddress ip = InetAddress.getLocalHost();
			if (ip != null) {
			  submitHostAddress = ip.getHostAddress();
			  submitHostName = ip.getHostName();
			  conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
			  conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
			}
			// 从ClientPotocol的实现类获得当前job的id
			JobID jobId = submitClient.getNewJobID();
			job.setJobID(jobId);
			Path submitJobDir = new Path(jobStagingArea, jobId.toString());
			JobStatus status = null;
			// 将该job任务运行所在的程序的jar包、所要处理的input split信息以及配置项写入HDFS
			try {
			  conf.set("hadoop.http.filter.initializers", 
				  "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
			  conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
			  LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
				  + " as the submit dir");
			  TokenCache.obtainTokensForNamenodes(job.getCredentials(),
				  new Path[] { submitJobDir }, conf);			  
			  populateTokenCache(conf, job.getCredentials());
			  copyAndConfigureFiles(job, submitJobDir);
			  Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);			  
			  // 写入Split信息
			  LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
			  int maps = writeSplits(job, submitJobDir);
			  conf.setInt(MRJobConfig.NUM_MAPS, maps);
			  LOG.info("number of splits:" + maps);
			  // 写入Job提交的queue
			  String queue = conf.get(MRJobConfig.QUEUE_NAME,
				  JobConf.DEFAULT_QUEUE_NAME);
			  AccessControlList acl = submitClient.getQueueAdmins(queue);
			  conf.set(toFullPropertyName(queue,
				  QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
			  // 删除用于提供用户访问的Token
			  TokenCache.cleanUpTokenReferral(conf);
			  // 写入配置信息
			  writeConf(conf, submitJobFile);
			  printTokens(jobId, job.getCredentials());
			  // 调用ClientProtocol的submitJob方法提交Job
			  status = submitClient.submitJob(
				  jobId, submitJobDir.toString(), job.getCredentials());
			  ...
		}
 

3) YARNRunner.java

在Yarn框架中,将由实现了ClientPotocol的YARNRunner类接管submitJob工作,检查运行条件并初始化运行MRAppMaster的相关信息,向ResourceManager申请Container运行MRAppMaster

 

		  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
		  throws IOException, InterruptedException {
			// 查看是否开启HistoryServer,如果开启则设置相关信息
			MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
			if (hsProxy != null) {
			  if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, 
				  DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
				Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( 
						conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
				ts.addToken(hsDT.getService(), hsDT);
			  }
			}
			// 写入Job相关文件的相关信息:文件大小等
			Path applicationTokensFile =
				new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
			try {
			  ts.writeTokenStorageFile(applicationTokensFile, conf);
			} catch (IOException e) {
			  throw new YarnException(e);
			}
			/* 初始化Application上下文
				ApplicationId
				ApplicationName
				Queue:Application将被提交到的队列
				Priority:Application的优先级
				User:运行MRAppMaster的用户
				AMContainerSpec:运行ApplicationMaster的Container的信息
					ContainerId
					User:运行MRAppMaster的用户
					Resource:ResourceManager分配给该MRAppMaster的资源
					ContainerToken:Security模式下的SecurityTokens
					LocalResources:MRAppMaster所在的jar包、Job的配置文件、Job程序所在的jar包、每个Split的相关信息等
					ServiceData:
					Environment:运行MRAppMaster的ClassPath以及其他的环境便令
					Commands:运行MRAppMaster的Command,如:$JAVA_HOME/bin/java MRAppMaster.class.getName() ...
					ApplicationACLs:MRAppMaster的访问控制列表
			*/
			ApplicationSubmissionContext appContext =
			  createApplicationSubmissionContext(conf, jobSubmitDir, ts);
			  
			// 由ResourceMgrDelegate提交给ResourceManager
			ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
			// 由ResourceMgrDelegate获取MRAppMaster的运行信息
			ApplicationReport appMaster = resMgrDelegate
				.getApplicationReport(applicationId);
			...
			return clientCache.getClient(jobId).getJobStatus(jobId);
		  }
 

4) ResourceMgrDelegate.java

ResourceMgrDelegate负责和ResourceManager的通信,并向ResourceManager提交启动ApplicationMaster(MRAppMaster)

 

		// 连接ResourceManager
		  public ResourceMgrDelegate(YarnConfiguration conf) {
			this.conf = conf;
			YarnRPC rpc = YarnRPC.create(this.conf);
			InetSocketAddress rmAddress =
				NetUtils.createSocketAddr(this.conf.get(
					YarnConfiguration.RM_ADDRESS,
					YarnConfiguration.DEFAULT_RM_ADDRESS),
					YarnConfiguration.DEFAULT_RM_PORT,
					YarnConfiguration.RM_ADDRESS);
			this.rmAddress = rmAddress.toString();
			LOG.debug("Connecting to ResourceManager at " + rmAddress);
			applicationsManager =
				(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
					rmAddress, this.conf);
			LOG.debug("Connected to ResourceManager at " + rmAddress);
		  }
		// 获取一个Application
		  public JobID getNewJobID() throws IOException, InterruptedException {
			GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
			applicationId = applicationsManager.getNewApplication(request).getApplicationId();
			// TypeConverter类用于对Yarn格式的Job信息和旧版本Hadoop的Job信息进行转换,比如JobId、ApplicationId、TaskId等
			return TypeConverter.fromYarn(applicationId);
		  }
		// 将Application提交到ResourceManager,ResourceManager将分配Container运行MRAppMaster
		  public ApplicationId submitApplication(
			  ApplicationSubmissionContext appContext) 
		  throws IOException {
			appContext.setApplicationId(applicationId);
			SubmitApplicationRequest request = 
				recordFactory.newRecordInstance(SubmitApplicationRequest.class);
			request.setApplicationSubmissionContext(appContext);
			applicationsManager.submitApplication(request);
			LOG.info("Submitted application " + applicationId + " to ResourceManager" +
					" at " + rmAddress);
			return applicationId;
		  }
		// 向ResourceManager询问Application运行的信息
		  public ApplicationReport getApplicationReport(ApplicationId appId)
			  throws YarnRemoteException {
			GetApplicationReportRequest request = recordFactory
				.newRecordInstance(GetApplicationReportRequest.class);
			request.setApplicationId(appId);
			GetApplicationReportResponse response = applicationsManager
				.getApplicationReport(request);
			ApplicationReport applicationReport = response.getApplicationReport();
			return applicationReport;
		  }	  
 

至此Client端(即提交Job的机器)的工作结束,接下来将由ResourceManager接管,分配Container运行MRAppMaster

2. Server端(ResourceManager)

To be continued...

分享到:
评论

相关推荐

    YARN(MRv2)搭建

    YARN(MRv2)搭建

    yarn-v0.23.2.tar.gz

    yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz

    flink依赖jar包——解决NoClassDefFoundError: com/sun/jersey

    at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:150) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56) ... Caused by...

    flink on yarn 缺少依赖NoClassDefFoundError: com/sun/jersey

    flink-hadoop-compatibility_2.12-1.7.1.jar javax.ws.rs-api-2.0.1.jar jersey-common-2.27.jar ...Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

    hadoop-yarn-client-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...

    hadoop-yarn-client-2.7.3-API文档-中英对照版.zip

    赠送jar包:hadoop-yarn-client-2.7.3.jar; 赠送原API文档:hadoop-yarn-client-2.7.3-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.7.3.pom;...

    hadoop-yarn-client-2.5.1-API文档-中英对照版.zip

    赠送jar包:hadoop-yarn-client-2.5.1.jar; 赠送原API文档:hadoop-yarn-client-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.5.1.pom;...

    spark初始化源码阅读sparkonyarn的client和cluster区别

    Spark 初始化源码阅读 Spark on YARN 的 Client 和 Cluster 区别 Spark 是一个大数据处理的开源框架,它可以在多种集群管理器上运行,如 YARN、Mesos 和 Standalone。Spark on YARN 是 Spark 在 YARN 集群管理器上...

    hadoop-yarn-client-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...

    Yarn编程ApplicationList

    YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start(); GetApplicationsRequestProto request = GetApplicationsRequestProto.getDefaultInstance(); ...

    yarn-v1.22.5.tar.gz

    2. **锁定文件**:Yarn 使用 `yarn.lock` 文件来确保所有开发者的环境中安装的包版本一致,解决了 npm 的版本漂移问题,提高了项目的可预测性和可重复性。 3. **并行下载**:Yarn 能够并行下载依赖包,显著提升了...

    Hadoop源码分析(client部分)

    #### Client端Application提交流程源码分析 Client端提交作业的过程涉及多个关键技术和步骤,主要包括: 1. **使用Google的ProtoBuf技术**:用于序列化和反序列化数据。ProtoBuf是一种轻量级的数据交换格式,类似...

    Flink on Yarn_K8S原理剖析及实践.pdf

    在YARN/K8S上部署Flink,意味着Flink应用可以无缝接入大数据生态系统,并利用YARN/K8S的资源管理能力,实现资源的有效调度和高可用性部署。同时,这种集成也为大数据处理提供了更加灵活的运行环境,用户可以根据自己...

    YARN框架代码详细分析

    2. 支持多计算模型:YARN通过引入ApplicationMaster抽象,可以支持除MapReduce之外的多种计算模型,例如Spark, Tez, Flink等。因此,YARN不仅仅局限于批处理作业,还可以支持交互式、迭代式作业以及数据流处理。 在...

    hadoop-yarn-api-python-client:Hadoop:registered:YARN API的Python客户端

    hadoop-yarn-api-python-client 适用于ApacheHadoop:registered:YARN API的Python客户端 软件包文档: REST API文档: 兼容性库与Apache Hadoop 3.2.1兼容。 如果您使用的版本不是上述版本(或类似Hortonworks的供应...

    Hadoop技术内幕深入解析YARN架构设计与实现原理.董西成

    YARN是Hadoop 2.x版本引入的重要更新,它的出现旨在解决Hadoop MapReduce v1在资源管理和任务调度上的局限性。YARN的核心理念是将集群资源管理和应用程序执行分离,从而提高了系统的资源利用率和并发处理能力。 ...

    Yarn框架代码详细分析

    YARN的WEB框架解析包括了对YARN WEB界面代码的分析以及它提供的一些功能,如集群信息的查看、资源使用情况的监控等。 总之,YARN作为Hadoop生态系统中的重要组件,通过它的设计和代码实现了对大规模分布式计算资源...

    hadoop-yarn-client-2.5.1-API文档-中文版.zip

    赠送jar包:hadoop-yarn-client-2.5.1.jar; 赠送原API文档:hadoop-yarn-client-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.5.1.pom;...

    Yarn获取Application列表编码

    要实现 Yarn 获取 Application 列表编码,需要使用 Yarn 客户端 API,首先需要创建一个 YarnClient 对象,然后使用该对象的 getApplications() 方法获取当前 Hadoop 集群中的应用程序列表。每个应用程序都对应一个 ...

    hadoop 源码解析_yarn源码解析

    Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...

Global site tag (gtag.js) - Google Analytics