1. 申请Application
1) 连接ResourceManager
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at " rmAddress);
ClientRMProtocol applicationsManager = ((ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, rmAddress, conf));
2) 向ResourceManager申请一个Application
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
LOG.info("Got new application id=" response.getApplicationId());
2. 初始化ApplicationMaster的上下文(
ApplicationId
ApplicationName
Queue:Application将被提交到的队列
Priority:Application的优先级
User:运行该Application的用户
AMContainerSpec:运行ApplicationMaster的Container的信息
)
1) 设置Application属性
ApplicationId appId = response.getApplicationId();
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
appContext.setApplicationId(appId);
appContext.setApplicationName(appName);
2) 设置运行ApplicationMaster的Container的上下文
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// 上传包含ApplicationMaster的jar包
FileSystem fs = FileSystem.get(conf);
Path src = new Path(appMasterJar);
String pathSuffix = appName "/" appId.getId() "/AppMaster.jar";
Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
fs.copyFromLocalFile(false, true, src, dst);
FileStatus destStatus = fs.getFileStatus(dst);
// 上传ApplicationMaster的log4j配置文件
Path log4jSrc = new Path(log4jPropFile);
Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
// 设置LocalResources
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
amJarRsrc.setType(LocalResourceType.FILE);
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
amJarRsrc.setTimestamp(destStatus.getModificationTime());
amJarRsrc.setSize(destStatus.getLen());
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.put("AppMaster.jar", amJarRsrc);
LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
log4jRsrc.setType(LocalResourceType.FILE);
log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
log4jRsrc.setSize(log4jFileStatus.getLen());
localResources.put("log4j.properties", log4jRsrc);
amContainer.setLocalResources(localResources);
// 设置运行ApplicationMaster的环境变量
StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH).split(",")) {
classPathEnv.append(':');
classPathEnv.append(c.trim());
}
classPathEnv.append(":./log4j.properties");
Map<String, String> env = new HashMap<String, String>();
env.put("CLASSPATH", classPathEnv.toString());
amContainer.setEnvironment(env);
// 设置运行ApplicationMaster的命令
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
vargs.add("${JAVA_HOME}" + "/bin/java");
vargs.add("-Xmx" + amMemory + "m");
vargs.add(appMasterMainClass);
vargs.add(some args);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
amContainer.setCommands(commands);
// 设置ApplicationMaster运行所需的内存
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(amMemory);
amContainer.setResource(capability);
appContext.setAMContainerSpec(amContainer);
3) 设置优先级、队列、用户等
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(amPriority);
appContext.setPriority(pri);
appContext.setQueue(amQueue);
appContext.setUser(amUser);
3. 创建运行application的request,并提交
SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
applicationsManager.submitApplication(appRequest);
4. 向ResourceManager轮训Application的状态
1) 构建获取Application状态的request并提交
GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(appId);
GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
ApplicationReport report = reportResponse.getApplicationReport();
2) 获取Application的相关信息
LOG.info("Got application report from ASM for"
+ ", appId=" + appId.getId()
+ ", clientToken=" + report.getClientToken()
+ ", appDiagnostics=" + report.getDiagnostics()
+ ", appMasterHost=" + report.getHost()
+ ", appQueue=" + report.getQueue()
+ ", appMasterRpcPort=" + report.getRpcPort()
+ ", appStartTime=" + report.getStartTime()
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
+ ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+ ", appTrackingUrl=" + report.getTrackingUrl()
+ ", appUser=" + report.getUser());
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
分享到:
相关推荐
- ResourceManager为ApplicationMaster分配第一个Container,并在该Container内启动ApplicationMaster。这个阶段,ApplicationMaster的主要职责是与ResourceManager协商资源,并准备执行任务。 3. **资源分配与任务...
每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,...
YarnClient模式适用于交互和调试,而YarnCluster模式更适用于生产环境,两者的区别主要在于ApplicationMaster进程的不同使用方式。 在资源管理方面,文档说明了Yarn和Standalone两种资源管理组件,以及它们的Master...
12. YARN任务提交流程:YARN任务提交流程包括Client端提交应用程序、ResourceManager分配资源、ApplicationMaster管理应用程序等步骤。 13. YARN资源调度三种模型:YARN提供了三种资源调度模型,包括FIFO调度、...
客户端通过这些库与ResourceManager交互,提交应用程序,而ApplicationMaster则与NodeManager交互来启动和管理容器。 总结来说,董西成在“Hadoop英雄会——暨Hadoop 10周年生日大趴”中详细介绍了Hadoop YARN的...
2. **提交应用**:客户端使用`ApplicationClientProtocol#submitApplication`方法将ApplicationMaster的相关信息提交给ResourceManager。 3. **启动Application Master**:ResourceManager根据...
- **ApplicationManager**:负责接收应用程序提交请求、启动ApplicationMaster以及监视其生命周期。 #### 二十、ResourceManager的作用 - **处理客户端请求**:接收来自客户端的请求。 - **启动/监控...
5. **启动和测试**:使用生成的可执行文件启动Hadoop服务,并通过fsShell或其他工具进行简单的数据操作,验证Hadoop是否正确安装和配置。 学习Hadoop-2.6.4源码可以帮助开发者深入理解分布式系统的设计原则,提升在...
源码分析是理解Hadoop工作原理、进行定制化开发或调试问题的关键步骤。`hadoop2.6.5源码zip`文件包含了完整的Hadoop 2.6.5源代码,开发者可以深入研究其内部机制,了解如何实现MapReduce计算模型、HDFS分布式文件...