1. 申请Container
1) 连接ResourceManager
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
YarnConfiguration yarnConf = new YarnConfiguration(conf);
// 获取ResourceManager的地址
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
AMRMProtocol resourceManager = ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
2) 向ResourceManager注册为ApplicationMaster
RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
// 设定该Application的相关信息
appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostname);
appMasterRequest.setRpcPort(appMasterRpcPort);
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterRequest);
3) 向ResourceManager申请Container
// 初始化申请Container的request (包含运行Container的host、优先级、占用内存)
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setHostName("*");
request.setNumContainers(numContainers);
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(requestPriority);
request.setPriority(pri);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory);
request.setCapability(capability);
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
resourceReq.add(request);
// 向ResourceManager发送报告 (包含第几次申请、需要申请Container的request、保存需要释放Container的List、已分配Container与总共Container的比例)
AllocateRequest req = Records.newRecord(AllocateRequest.class);
CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId
AtomicInteger rmRequestID = new AtomicInteger();
req.setResponseId(rmRequestID.incrementAndGet());
req.setApplicationAttemptId(appAttemptID);
req.addAllAsks(resourceReq);
req.addAllReleases(releasedContainers);
req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
AllocateResponse resp = resourceManager.allocate(req);
AMResponse amResp = resp.getAMResponse();
2. 为申请到的Container分配任务
1) 获取上面申请到的Container
List<Container> allocatedContainers = amResp.getAllocatedContainers();
for (Container allocatedContainer : allocatedContainers) {
}
2) 初始化运行Container的上下文 (
ContainerId
User:运行该Container的用户,即运行当前Application的用户
Resource:ResourceManager分配给该Container的资源
ContainerToken:Security模式下的SecurityTokens
LocalResources:该Container所运行的程序所需的资源,比如程序所在的jar包
ServiceData:
Environment:该Container所运行的程序所需的环境变量,KeyValue格式
Commands:该Container所运行程序的命令,比如运行的为java程序,即$JAVA_HOME/bin/java org.yourclass
ApplicationACLs:该Container所属的Application的访问控制列表
)
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
ctx.setContainerId(container.getId());
ctx.setResource(container.getResource());
ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
Map<String, String> childEnv = new HashMap<String, String>();
ctx.setEnvironment(childEnv);
// 设定LocalResource
// 将jar包上传到HDFS上
FileSystem fs = FileSystem.get(conf);
Path src = new Path(存放Container程序的本地路径);
String pathSuffix = appName + "/" + appId.getId() + "/ChildProgram.jar";
Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
fs.copyFromLocalFile(false, true, src, dst);
FileStatus destStatus = fs.getFileStatus(dst);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
LocalResource childRsrc = Records.newRecord(LocalResource.class);
childRsrc.setType(LocalResourceType.FILE);
childRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
childRsrc.setTimestamp(destStatus.getModificationTime());
childRsrc.setSize(destStatus.getLen());
childRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(dst)));
localResources.put("Child.jar", amJarRsrc);
ctx.setLocalResources(localResources);
// 设定Command
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
vargs.add(JavaCommand);
vargs.add(JavaArgs);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
ctx.setCommands(commands);
StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
3) 连接该Container属于的ContainerManager
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
ContainerManager cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
4) 通过ContainerManager启动Container
cm.startContainer(startReq);
3. 轮询获取Container的状态
1) 向ContainerManager获取Container的状态
GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
statusReq.setContainerId(container.getId());
GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
ContainerStatus containerStatus = statusResp.getStatus();
4. 更新Application状态
1) 在所有Container运行成功/失败后通知ResourceManager该Application运行成功/失败
FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setAppAttemptId(appAttemptID);
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
// finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
finishReq.setDiagnostics(diagnostics);
resourceManager.finishApplicationMaster(finishReq);
分享到:
相关推荐
- NodeManager根据ApplicationMaster的指令,启动Container来运行具体的任务,如MapTask或ReduceTask。 4. **任务执行过程** - MapTask在Container内被执行,处理输入数据并生成中间结果。 - MapTask完成后,...
2. ResourceManager分配第一个Container给NodeManager,并指示在该Container中启动ApplicationMaster。 3. ApplicationMaster向ResourceManager注册,以便用户通过ResourceManager查看应用程序的状态。 4. ...
Service Registry 是一个服务发现机制,它允许 ApplicationMaster 在集群中找到所需的资源和服务。例如,如果某个服务需要与另一个服务通信,则可以通过 Service Registry 来定位目标服务的位置。 ##### 3. ...
2. RM 分配第一个 Container,并与对应的 NM 通信,要求它在这个 Container 启动应用程序的 ApplicationMaster。 3. AM 首先向 RM 注册,通过 RM 可以查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它...
2. **资源申请**:ResourceManager为作业启动一个ApplicationMaster,并分配第一个Container。 3. **初始化AM**:ApplicationMaster在某个NodeManager上启动,向ResourceManager申请更多资源。 4. **任务调度**:...
Yarn 是 Apache Hadoop 的一个核心组件,全称为 Yet Another Resource Negotiator,即另一种资源协调者。它作为大数据处理的基石,负责管理和调度集群中的计算资源,为各种分布式计算框架如 MapReduce、Spark 等提供...
2. ResourceManager为应用程序启动一个ApplicationMaster。 3. ApplicationMaster负责向ResourceManager申请资源,并与NodeManager通信以启动容器(Container)。 4. NodeManager在节点上执行任务,并向...
6. NM在获取到资源后,启动Container,下载并执行任务。 Yarn的调度器有多种,包括FIFO(先进先出)、Capacity Scheduler和Fair Scheduler,它们都致力于优化资源利用率和任务执行效率。调度算法是Yarn的核心部分,...
3. **任务调度**:ApplicationMaster将任务调度到分配的资源上,即在NodeManager上启动Container来执行任务。 4. **任务执行**:NodeManager在Container中执行任务,并监控任务的运行状态。 5. **状态汇报**:...
3. HDFS文件上传时DataNode故障处理:如果HDFS在上传文件时其中一个DataNode突然挂掉了,会自动将该块重复写入到其他DataNode,以确保数据的安全。 4. NameNode启动时的操作:NameNode启动时会加载FsImage、加载Edit...
2. 分配资源:ResourceManager为应用分配第一个Container,并启动ApplicationMaster。 3. 资源请求:ApplicationMaster向ResourceManager申请更多资源(Container)来运行任务。 4. 执行任务:ResourceManager将资源...
本例中的"___下载.zip"可能包含了一个基于YARN的应用示例,我们可以假设它是一个自定义的框架或工具,名为"Kitten"。 2. **理解ApplicationMaster**:ApplicationMaster是应用程序的控制器,负责与ResourceManager...
- 每次作业运行时都会启动一个ApplicationMaster实例。 - 负责向ResourceManager申请资源,并将任务分配给各个NodeManager。 - 监控任务的状态并向用户报告进度。 4. **Container** - 容器是YARN中的基本计算...
2. AM启动:RM为应用分配第一个Container,并在某个NM上启动AM。 3. 资源请求:AM向RM请求执行任务所需的Container。 4. 资源分配:RM根据全局策略分配Container,并通知NM。 5. 任务执行:NM在获得Container的NM上...
4. **ApplicationMaster**:每个MapReduce作业都有一个ApplicationMaster,负责与ResourceManager交互,获取资源,调度和监控任务。 5. **NodeManager**:节点级别的管理器,负责本节点上的Container生命周期管理和...
2. RM为应用程序分配第一个Container(资源容器)并通知NM启动AM。 3. AM与RM协商,申请更多Container来运行应用程序的任务。 4. NM接收AM的请求,启动任务,并向AM报告任务状态。 5. AM监控任务进度,根据需要向RM...
Hadoop YARN(Yet Another Resource Negotiator)是Hadoop生态系统中的一种新型资源管理框架,它为上层应用提供了一个统一的资源管理和调度平台。YARN的引入极大地提高了集群的利用率,实现了资源的有效统一管理,并...
- **Local 模式**:在本地环境中运行,用于调试,JobManager 和 TaskManager 在同一个JVM中运行。 - **Standalone 模式**:独立的分布式集群,不需要其他资源调度框架,JobManager 负责协调,TaskManager 执行任务...