`
x-rip
  • 浏览: 107183 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

由Client启动ApplicationMaster的步骤

 
阅读更多

 

1. 申请Application

1) 连接ResourceManager

		YarnConfiguration yarnConf = new YarnConfiguration(conf);
		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
			YarnConfiguration.RM_ADDRESS,
			YarnConfiguration.DEFAULT_RM_ADDRESS));		
		LOG.info("Connecting to ResourceManager at "  rmAddress);
		ClientRMProtocol applicationsManager = ((ClientRMProtocol) rpc.getProxy(
			ClientRMProtocol.class, rmAddress, conf));
 

2) 向ResourceManager申请一个Application

		    GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);		
			GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
			LOG.info("Got new application id="  response.getApplicationId());
 

2. 初始化ApplicationMaster的上下文(

ApplicationId

ApplicationName

Queue:Application将被提交到的队列

Priority:Application的优先级

User:运行该Application的用户

AMContainerSpec:运行ApplicationMaster的Container的信息

)

1) 设置Application属性

		ApplicationId appId = response.getApplicationId();
		ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
		appContext.setApplicationId(appId);
		appContext.setApplicationName(appName);
 

2) 设置运行ApplicationMaster的Container的上下文

		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
		
		// 上传包含ApplicationMaster的jar包
		FileSystem fs = FileSystem.get(conf);
		Path src = new Path(appMasterJar);
		String pathSuffix = appName  "/"  appId.getId()  "/AppMaster.jar";	    
		Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
		fs.copyFromLocalFile(false, true, src, dst);
		FileStatus destStatus = fs.getFileStatus(dst);
		
		// 上传ApplicationMaster的log4j配置文件
		Path log4jSrc = new Path(log4jPropFile);
		Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
		fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
		FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
		
		// 设置LocalResources
		LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
		amJarRsrc.setType(LocalResourceType.FILE);
		amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
		amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 
		amJarRsrc.setTimestamp(destStatus.getModificationTime());
		amJarRsrc.setSize(destStatus.getLen());
		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
		localResources.put("AppMaster.jar",  amJarRsrc);
		
		LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
		log4jRsrc.setType(LocalResourceType.FILE);
		log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
		log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
		log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
		log4jRsrc.setSize(log4jFileStatus.getLen());
		localResources.put("log4j.properties", log4jRsrc);
		
		amContainer.setLocalResources(localResources);
		
		// 设置运行ApplicationMaster的环境变量
		StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");  
		for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH).split(",")) {  
			classPathEnv.append(':');  
			classPathEnv.append(c.trim());  
		}  
		classPathEnv.append(":./log4j.properties");
		Map<String, String> env = new HashMap<String, String>();
		env.put("CLASSPATH", classPathEnv.toString());
		amContainer.setEnvironment(env);
		
		// 设置运行ApplicationMaster的命令
		Vector<CharSequence> vargs = new Vector<CharSequence>(30);
		vargs.add("${JAVA_HOME}" + "/bin/java");
		vargs.add("-Xmx" + amMemory + "m");
		vargs.add(appMasterMainClass);
		vargs.add(some args);
		vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
		vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
		StringBuilder command = new StringBuilder();
		for (CharSequence str : vargs) {
		  command.append(str).append(" ");
		} 
		List<String> commands = new ArrayList<String>();
		commands.add(command.toString());		
		amContainer.setCommands(commands);
		
		// 设置ApplicationMaster运行所需的内存
		Resource capability = Records.newRecord(Resource.class);
		capability.setMemory(amMemory);
		amContainer.setResource(capability);
		
		appContext.setAMContainerSpec(amContainer);
 

3) 设置优先级、队列、用户等

		Priority pri = Records.newRecord(Priority.class);
		pri.setPriority(amPriority);
		appContext.setPriority(pri);
		appContext.setQueue(amQueue);
		appContext.setUser(amUser);
 

3. 创建运行application的request,并提交

	SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
    appRequest.setApplicationSubmissionContext(appContext);
	applicationsManager.submitApplication(appRequest);
 

4. 向ResourceManager轮训Application的状态

1) 构建获取Application状态的request并提交

		GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
		reportRequest.setApplicationId(appId);
		GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
		ApplicationReport report = reportResponse.getApplicationReport();
 

2) 获取Application的相关信息

		LOG.info("Got application report from ASM for"
			+ ", appId=" + appId.getId()
			+ ", clientToken=" + report.getClientToken()
			+ ", appDiagnostics=" + report.getDiagnostics()
			+ ", appMasterHost=" + report.getHost()
			+ ", appQueue=" + report.getQueue()
			+ ", appMasterRpcPort=" + report.getRpcPort()
			+ ", appStartTime=" + report.getStartTime()
			+ ", yarnAppState=" + report.getYarnApplicationState().toString()
			+ ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
			+ ", appTrackingUrl=" + report.getTrackingUrl()
			+ ", appUser=" + report.getUser());
		YarnApplicationState state = report.getYarnApplicationState();
		FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
 
分享到:
评论

相关推荐

    yarn基本运作流程

    - ResourceManager为ApplicationMaster分配第一个Container,并在该Container内启动ApplicationMaster。这个阶段,ApplicationMaster的主要职责是与ResourceManager协商资源,并准备执行任务。 3. **资源分配与任务...

    大数据 40 道面试题及答案.docx

    每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,...

    巴豆大数据团队讲师课件Spark.pdf

    YarnClient模式适用于交互和调试,而YarnCluster模式更适用于生产环境,两者的区别主要在于ApplicationMaster进程的不同使用方式。 在资源管理方面,文档说明了Yarn和Standalone两种资源管理组件,以及它们的Master...

    2021年超全超详细的最新大数据开发面试题及答案解析.pdf

    12. YARN任务提交流程:YARN任务提交流程包括Client端提交应用程序、ResourceManager分配资源、ApplicationMaster管理应用程序等步骤。 13. YARN资源调度三种模型:YARN提供了三种资源调度模型,包括FIFO调度、...

    董西成:Hadoop YARN程序设计与应用案例

    客户端通过这些库与ResourceManager交互,提交应用程序,而ApplicationMaster则与NodeManager交互来启动和管理容器。 总结来说,董西成在“Hadoop英雄会——暨Hadoop 10周年生日大趴”中详细介绍了Hadoop YARN的...

    YARN应用开发与核心源码剖析.pdf

    2. **提交应用**:客户端使用`ApplicationClientProtocol#submitApplication`方法将ApplicationMaster的相关信息提交给ResourceManager。 3. **启动Application Master**:ResourceManager根据...

    Hadoop基础面试题(附答案)

    - **ApplicationManager**:负责接收应用程序提交请求、启动ApplicationMaster以及监视其生命周期。 #### 二十、ResourceManager的作用 - **处理客户端请求**:接收来自客户端的请求。 - **启动/监控...

    hadoop-2.6.4 源码

    5. **启动和测试**:使用生成的可执行文件启动Hadoop服务,并通过fsShell或其他工具进行简单的数据操作,验证Hadoop是否正确安装和配置。 学习Hadoop-2.6.4源码可以帮助开发者深入理解分布式系统的设计原则,提升在...

    hadoop2.6.5源码zip

    源码分析是理解Hadoop工作原理、进行定制化开发或调试问题的关键步骤。`hadoop2.6.5源码zip`文件包含了完整的Hadoop 2.6.5源代码,开发者可以深入研究其内部机制,了解如何实现MapReduce计算模型、HDFS分布式文件...

Global site tag (gtag.js) - Google Analytics