`
x-rip
  • 浏览: 106924 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

由ApplicationMaster启动一个Container的步骤

 
阅读更多

 

 

1. 申请Container

1) 连接ResourceManager

		Configuration conf = new Configuration();
		YarnRPC rpc = YarnRPC.create(conf);
		YarnConfiguration yarnConf = new YarnConfiguration(conf);
		// 获取ResourceManager的地址
		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
			YarnConfiguration.RM_SCHEDULER_ADDRESS,
			YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
		AMRMProtocol resourceManager = ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));	
    

 

2) 向ResourceManager注册为ApplicationMaster

		RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
		// 设定该Application的相关信息
		appMasterRequest.setApplicationAttemptId(appAttemptID);
		appMasterRequest.setHost(appMasterHostname);
		appMasterRequest.setRpcPort(appMasterRpcPort);
		appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
		RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterRequest);
 

 

3) 向ResourceManager申请Container

		// 初始化申请Container的request (包含运行Container的host、优先级、占用内存)
		ResourceRequest request = Records.newRecord(ResourceRequest.class);
		request.setHostName("*");
		request.setNumContainers(numContainers);
		Priority pri = Records.newRecord(Priority.class);
		pri.setPriority(requestPriority);
		request.setPriority(pri);
		Resource capability = Records.newRecord(Resource.class);
		capability.setMemory(containerMemory);
		request.setCapability(capability);
		List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
		resourceReq.add(request);
		// 向ResourceManager发送报告 (包含第几次申请、需要申请Container的request、保存需要释放Container的List、已分配Container与总共Container的比例)
		AllocateRequest req = Records.newRecord(AllocateRequest.class);
		CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId
		AtomicInteger rmRequestID = new AtomicInteger();
		req.setResponseId(rmRequestID.incrementAndGet());
		req.setApplicationAttemptId(appAttemptID);
		req.addAllAsks(resourceReq);
		req.addAllReleases(releasedContainers);
		req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
		AllocateResponse resp = resourceManager.allocate(req);
		AMResponse amResp = resp.getAMResponse();
   

 

2. 为申请到的Container分配任务

1) 获取上面申请到的Container

 

		List<Container> allocatedContainers = amResp.getAllocatedContainers();
		for (Container allocatedContainer : allocatedContainers) {
			
		}
 

2) 初始化运行Container的上下文 (

ContainerId

User:运行该Container的用户,即运行当前Application的用户

Resource:ResourceManager分配给该Container的资源

ContainerToken:Security模式下的SecurityTokens

LocalResources:该Container所运行的程序所需的资源,比如程序所在的jar包

ServiceData:

Environment:该Container所运行的程序所需的环境变量,KeyValue格式

Commands:该Container所运行程序的命令,比如运行的为java程序,即$JAVA_HOME/bin/java org.yourclass

ApplicationACLs:该Container所属的Application的访问控制列表

)

 

		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
		ctx.setContainerId(container.getId());
		ctx.setResource(container.getResource());
		ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
		
		Map<String, String> childEnv = new HashMap<String, String>();
		ctx.setEnvironment(childEnv);
		
		// 设定LocalResource
		// 将jar包上传到HDFS上
		FileSystem fs = FileSystem.get(conf);
		Path src = new Path(存放Container程序的本地路径);
		String pathSuffix = appName + "/" + appId.getId() + "/ChildProgram.jar";	    
		Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
		fs.copyFromLocalFile(false, true, src, dst);
		FileStatus destStatus = fs.getFileStatus(dst);
		
		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
		LocalResource childRsrc = Records.newRecord(LocalResource.class);
        childRsrc.setType(LocalResourceType.FILE);
        childRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
	    childRsrc.setTimestamp(destStatus.getModificationTime());
		childRsrc.setSize(destStatus.getLen());	
        childRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(dst)));
		localResources.put("Child.jar",  amJarRsrc);
		ctx.setLocalResources(localResources);
		
		// 设定Command
		Vector<CharSequence> vargs = new Vector<CharSequence>(5); 
		vargs.add(JavaCommand);
		vargs.add(JavaArgs);
		vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
		vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
		StringBuilder command = new StringBuilder();
		for (CharSequence str : vargs) {
			command.append(str).append(" ");
		}
		List<String> commands = new ArrayList<String>();
		commands.add(command.toString());
		ctx.setCommands(commands);
		
		StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
		startReq.setContainerLaunchContext(ctx);
 

3) 连接该Container属于的ContainerManager

 

		String cmIpPortStr = container.getNodeId().getHost() + ":"
			+ container.getNodeId().getPort();
        InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
		ContainerManager cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
 

4) 通过ContainerManager启动Container

 

		cm.startContainer(startReq);
 

3. 轮询获取Container的状态

1) 向ContainerManager获取Container的状态

 

		GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
		statusReq.setContainerId(container.getId());
        GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
		ContainerStatus containerStatus = statusResp.getStatus();
 

4. 更新Application状态

1) 在所有Container运行成功/失败后通知ResourceManager该Application运行成功/失败

 

		FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
		finishReq.setAppAttemptId(appAttemptID);
		finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
		// finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
		finishReq.setDiagnostics(diagnostics);
		resourceManager.finishApplicationMaster(finishReq);
分享到:
评论

相关推荐

    yarn基本运作流程

    - NodeManager根据ApplicationMaster的指令,启动Container来运行具体的任务,如MapTask或ReduceTask。 4. **任务执行过程** - MapTask在Container内被执行,处理输入数据并生成中间结果。 - MapTask完成后,...

    Hadoop-Yarn.docx

    2. ResourceManager分配第一个Container给NodeManager,并指示在该Container中启动ApplicationMaster。 3. ApplicationMaster向ResourceManager注册,以便用户通过ResourceManager查看应用程序的状态。 4. ...

    YARN Application Development.

    Service Registry 是一个服务发现机制,它允许 ApplicationMaster 在集群中找到所需的资源和服务。例如,如果某个服务需要与另一个服务通信,则可以通过 Service Registry 来定位目标服务的位置。 ##### 3. ...

    YARN 基础架构,工作机制,任务调度器

    2. RM 分配第一个 Container,并与对应的 NM 通信,要求它在这个 Container 启动应用程序的 ApplicationMaster。 3. AM 首先向 RM 注册,通过 RM 可以查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它...

    Hadoop新框架Yarn详解.pdf

    2. **资源申请**:ResourceManager为作业启动一个ApplicationMaster,并分配第一个Container。 3. **初始化AM**:ApplicationMaster在某个NodeManager上启动,向ResourceManager申请更多资源。 4. **任务调度**:...

    19_尚硅谷大数据之MapReduce_Yarn1

    Yarn 是 Apache Hadoop 的一个核心组件,全称为 Yet Another Resource Negotiator,即另一种资源协调者。它作为大数据处理的基石,负责管理和调度集群中的计算资源,为各种分布式计算框架如 MapReduce、Spark 等提供...

    YARN应用场景、原理与资源调度v2.pdf

    2. ResourceManager为应用程序启动一个ApplicationMaster。 3. ApplicationMaster负责向ResourceManager申请资源,并与NodeManager通信以启动容器(Container)。 4. NodeManager在节点上执行任务,并向...

    大数据课程-Hadoop集群程序设计与开发-5.Yarn资源调度器_lk_edit.pptx

    6. NM在获取到资源后,启动Container,下载并执行任务。 Yarn的调度器有多种,包括FIFO(先进先出)、Capacity Scheduler和Fair Scheduler,它们都致力于优化资源利用率和任务执行效率。调度算法是Yarn的核心部分,...

    Hadoop资源管理器YARN详解

    3. **任务调度**:ApplicationMaster将任务调度到分配的资源上,即在NodeManager上启动Container来执行任务。 4. **任务执行**:NodeManager在Container中执行任务,并监控任务的运行状态。 5. **状态汇报**:...

    2021年超全超详细的最新大数据开发面试题及答案解析.pdf

    3. HDFS文件上传时DataNode故障处理:如果HDFS在上传文件时其中一个DataNode突然挂掉了,会自动将该块重复写入到其他DataNode,以确保数据的安全。 4. NameNode启动时的操作:NameNode启动时会加载FsImage、加载Edit...

    Yarn资源调用demo案例

    2. 分配资源:ResourceManager为应用分配第一个Container,并启动ApplicationMaster。 3. 资源请求:ApplicationMaster向ResourceManager申请更多资源(Container)来运行任务。 4. 执行任务:ResourceManager将资源...

    编写YARN应用程序的快速而有趣的方式。___下载.zip

    本例中的"___下载.zip"可能包含了一个基于YARN的应用示例,我们可以假设它是一个自定义的框架或工具,名为"Kitten"。 2. **理解ApplicationMaster**:ApplicationMaster是应用程序的控制器,负责与ResourceManager...

    mapreduce八股文

    - 每次作业运行时都会启动一个ApplicationMaster实例。 - 负责向ResourceManager申请资源,并将任务分配给各个NodeManager。 - 监控任务的状态并向用户报告进度。 4. **Container** - 容器是YARN中的基本计算...

    HadoopYARN权威指南【中、英文版】

    2. AM启动:RM为应用分配第一个Container,并在某个NM上启动AM。 3. 资源请求:AM向RM请求执行任务所需的Container。 4. 资源分配:RM根据全局策略分配Container,并通知NM。 5. 任务执行:NM在获得Container的NM上...

    hadoop 2.2 安装包

    4. **ApplicationMaster**:每个MapReduce作业都有一个ApplicationMaster,负责与ResourceManager交互,获取资源,调度和监控任务。 5. **NodeManager**:节点级别的管理器,负责本节点上的Container生命周期管理和...

    yarn-1.17.3.zip

    2. RM为应用程序分配第一个Container(资源容器)并通知NM启动AM。 3. AM与RM协商,申请更多Container来运行应用程序的任务。 4. NM接收AM的请求,启动任务,并向AM报告任务状态。 5. AM监控任务进度,根据需要向RM...

    YARN应用开发与核心源码剖析.pdf

    Hadoop YARN(Yet Another Resource Negotiator)是Hadoop生态系统中的一种新型资源管理框架,它为上层应用提供了一个统一的资源管理和调度平台。YARN的引入极大地提高了集群的利用率,实现了资源的有效统一管理,并...

    【大数据学习资料】Flink 八大主题面试准备.pdf

    - **Local 模式**:在本地环境中运行,用于调试,JobManager 和 TaskManager 在同一个JVM中运行。 - **Standalone 模式**:独立的分布式集群,不需要其他资源调度框架,JobManager 负责协调,TaskManager 执行任务...

Global site tag (gtag.js) - Google Analytics