`

YARN编程实例—distributedshell源码分析

 
阅读更多

1. 概述

本文介绍YARN自带的一个非常简单的应用程序编程实例—distributedshell,他可以看做YARN编程中的“hello world”,它的主要功能是并行执行用户提供的shell命令或者shell脚本。本文主要介绍distributedshell 的实现方法。

Distributedshell的源代码在文件夹

src\hadoop-yarn-project\hadoop-yarn\hadoop-yarn-applications\hadoop-yarn-applications-distributedshell下。

Distributedshell 的实现完全与文章“如何编写YARN应用程序”所描述的一般YARN应用程序的编写方法完全一致。

2. Distributedshell客户端源码分析

Distributedshell Client的入口main函数如下:

public static void main(String[] args) {

Client client = new Client();

boolean doRun = client.init(args);

if (!doRun) {

System.exit(0);

}

result = client.run();

}

DistributedShell Client中最重要的是函数为run(),该函数实现过程如下:

(1)构造RPC句柄。

利用Hadoop RPC接口创建一个可以直接与ResourceManager交互的RPC client句柄applicationsManager:

private void connectToASM() throws IOException {

YarnConfiguration yarnConf = new YarnConfiguration(conf);

InetSocketAddress rmAddress = yarnConf.getSocketAddr(

YarnConfiguration.RM_ADDRESS,

YarnConfiguration.DEFAULT_RM_ADDRESS,

YarnConfiguration.DEFAULT_RM_PORT);

LOG.info(“Connecting to ResourceManager at ” + rmAddress);

applicationsManager = ((ClientRMProtocol) rpc.getProxy(

ClientRMProtocol.class, rmAddress, conf));

}

(2)获取application id。

与ResourceManager通信,请求application id:

GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);

GetNewApplicationResponse response = applicationsManager.getNewApplication(request);

(3)构造ContainerLaunchContext。

构造一个用于运行ApplicationMaster的container,container相关信息被封装到ContainerLaunchContext对象中:

ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

//添加本地资源

//填充localResources

amContainer.setLocalResources(localResources);

//添加运行ApplicationMaster所需的环境变量

Map<String, String> env = new HashMap<String, String>();

//填充env

amContainer.setEnvironment(env);

//添加启动ApplicationMaster的命令

//填充commands;

amContainer.setCommands(commands);

//设置ApplicationMaster所需的资源

amContainer.setResource(capability);

(4)构造ApplicationSubmissionContext。

构造一个用于提交ApplicationMaster的ApplicationSubmissionContext:

ApplicationSubmissionContext appContext =

Records.newRecord(ApplicationSubmissionContext.class);

//设置application id,调用GetNewApplicationResponse#getApplicationId()

appContext.setApplicationId(appId);

//设置Application名称:“DistributedShell”

appContext.setApplicationName(appName);

//设置前面创建的container

appContext.setAMContainerSpec(amContainer);

//设置application的优先级,默认是0

pri.setPriority(amPriority);

//设置application的所在队列,默认是”"

appContext.setQueue(amQueue);

//设置application的所属用户,默认是”"

appContext.setUser(amUser);

(5)提交ApplicationMaster。

将ApplicationMaster提交到ResourceManager上,从而完成作业提交功能:

applicationsManager.submitApplication(appRequest);

(6)显示应用程序运行状态。

为了让用户知道应用程序进度,Client会每隔几秒在shell终端上打印一次应用程序运行状态:

while (true) {

Thread.sleep(1000);

GetApplicationReportRequest reportRequest =

Records.newRecord(GetApplicationReportRequest.class);

reportRequest.setApplicationId(appId);

GetApplicationReportResponse reportResponse =

applicationsManager.getApplicationReport(reportRequest);

ApplicationReport report = reportResponse.getApplicationReport();

//打印report内容

YarnApplicationState state = report.getYarnApplicationState();

FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();

if (YarnApplicationState.FINISHED == state) {

if (FinalApplicationStatus.SUCCEEDED == dsStatus) {

return true;

} else {

return false;

}

} else if (YarnApplicationState.KILLED == state

|| YarnApplicationState.FAILED == state) {

return false;

}

}

3. Distributedshell ApplicationMaster源码分析

Distributedshell ApplicationMaster的实现方法与“如何编写YARN应用程序”所描述的步骤完全一致,它的过程如下:

步骤1 ApplicationMaster由ResourceManager分配的一个container启用,之后,它与ResourceManager通信,注册自己,以告知自己所在的节点(host:port),trackingurl(客户端可通过该url直接查询AM运行状态)等。

RegisterApplicationMasterRequest appMasterRequest =

Records.newRecord(RegisterApplicationMasterRequest.class);

appMasterRequest.setApplicationAttemptId(appAttemptID);

appMasterRequest.setHost(appMasterHostname);

appMasterRequest.setRpcPort(appMasterRpcPort);

appMasterRequest.setTrackingUrl(appMasterTrackingUrl);

return resourceManager.registerApplicationMaster(appMasterRequest);

步骤2 ApplicationMaster周期性向ResourceManager发送心跳信息,以告知ResourceManager自己仍然活着,这是通过周期性调用AMRMProtocol#allocate实现的。

步骤3 为了完成计算任务,ApplicationMaster需向ResourceManage发送一个ResourceRequest描述对资源的需求,包括container个数、期望资源所在的节点、需要的CPU和内存等,而ResourceManager则为ApplicationMaster返回一个AllocateResponse结构以告知新分配到的container列表、运行完成的container列表和当前可用的资源量等信息。

while (numCompletedContainers.get() < numTotalContainers

&& !appDone) {

Thread.sleep(1000);

List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();

if (askCount > 0) {

ResourceRequest containerAsk = setupContainerAskForRM(askCount);

resourceReq.add(containerAsk);

}

//如果resourceReq为null,则可看做心跳信息,否则就是申请资源

AMResponse amResp =sendContainerAskToRM(resourceReq);

}

步骤4 对于每个新分配到的container,ApplicationMaster将创建一个ContainerLaunchContext对象,该对象包含container id,启动container所需环境、启动container命令,然后与对应的节点通信,以启动container。

LaunchContainerRunnable runnableLaunchContainer =

new LaunchContainerRunnable(allocatedContainer);

//每个container由一个线程启动

Thread launchThread = new Thread(runnableLaunchContainer);

launchThreads.add(launchThread);

launchThread.start();

步骤5 ApplicationMaster通过AMRMProtocol#allocate获取各个container的运行状况,一旦发现某个container失败了,则会重新向ResourceManager发送资源请求,以重新运行失败的container。

步骤6 作业运行失败后,ApplicationMaster向ResourceManager发送FinishApplicationMasterRequest请求,以告知自己运行结束。

FinishApplicationMasterRequest finishReq =

Records.newRecord(FinishApplicationMasterRequest.class);

finishReq.setAppAttemptId(appAttemptID);

boolean isSuccess = true;

if (numFailedContainers.get() == 0) {

finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);

}

分享到:
评论

相关推荐

    Yarn编程ApplicationList

    这段代码创建了一个`YarnClient`实例,发送一个请求获取所有应用程序,然后遍历并打印出每个应用的ID。 通过理解YARN的工作机制和提供的API,开发者可以构建强大的工具来管理和监控Hadoop集群上的应用程序,确保...

    MapReduce2.0源码分析与实战编程

    《MapReduce2.0源码分析与实战编程》是一本深度探讨Hadoop生态系统中的核心组件MapReduce 2.0(也称为YARN)的专著。MapReduce是大数据处理领域的重要框架,它提供了并行计算的能力,使得海量数据的处理变得高效可行...

    Yarn框架代码详细分析

    3. 应用程序主节点(ApplicationMaster, AM):负责管理运行在YARN上的每一个应用程序实例的生命周期,包括资源申请、任务调度、任务监控和容错等。 YARN框架的优势主要体现在: - 可扩展性:YARN通过将资源管理和...

    基于SpringBoot的Hadoop-Yarn资源监控系统源码.zip

    总的来说,这个项目为Hadoop YARN集群的管理者提供了一个强大的工具,利用SpringBoot的便利性和YARN的监控接口,实现对资源的实时监控和分析,有助于提升集群的效率和稳定性。通过深入研究源码,开发者可以学习到...

    YARN框架代码详细分析

    YARN(Yet Another Resource Negotiator),即另一种资源协调者,是Apache Hadoop项目的一部分,它代表了Hadoop第二代的MapReduce框架,也就是Hadoop 2.0。YARN的主要目的是为了解决之前版本Hadoop的扩展性和资源...

    MapReduce2.0源码分析与实战编程 文字注释版

    《MapReduce2.0源码分析与实战编程》是一本深度解析Hadoop MapReduce框架的书籍,其中包含详细的源码注释,旨在帮助读者深入理解MapReduce的工作原理,并能进行实际编程应用。这本书的重点在于剖析MapReduce的核心...

    dshell:基于原生的yarn-distributedshell上的二次开发

    dshell基于原生的yarn-distributedshell上的二次开发将普通的java程序提交到yarn上执行,通过yarn进行统一的资源管理调度。增加了参数-container_files 和 -container_archives 用来向每一个container中传输java执行...

    hadoop 源码解析_yarn源码解析

    Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...

    Storm 源码分析

    ### Storm源码分析 #### 一、Storm简介与应用场景 Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时...

    hadoop-yarn-applications-distributedshell-2.6.0.jar

    java运行依赖jar包

    深入理解Spark:核心思想及源码分析.pdf

    《深入理解Spark:核心思想及源码分析》这本书旨在帮助读者深入掌握Apache Spark这一大数据处理框架的核心原理与实现细节。Spark作为一个快速、通用且可扩展的数据处理系统,已经在大数据领域得到了广泛应用。它提供...

    Hadoop技术内幕深入解析YARN架构设计与实现原理.董西成

    《Hadoop技术内幕深入解析YARN架构设计与实现原理》是由董西成撰写的一本专著,主要聚焦于Hadoop生态系统中的YARN(Yet Another Resource Negotiator)框架。这本书详细阐述了YARN的架构设计原则、核心组件以及其...

    hadoop-yarn-applications-distributedshell-2.6.0-sources.jar

    java运行依赖jar包

    yarn-v1.19.1.tar.gz

    同时,通过阅读源码,开发者可以学习到Node.js模块化、异步编程、包管理等方面的最佳实践。 总的来说,`yarn-v1.19.1.tar.gz`提供了深入研究Yarn这个强大工具的机会,无论是为了提升自己的技术能力,还是为了参与到...

    Yarn框架代码详细分析V0.5

    Hadoop的2.0版本的yarn的框架介绍啊 Hadoop yarnYARN 本身框架的优势是扩展性与支持多计算模型。对于扩展性目前主要体现在计算节点规模上,以前 JobTracker-TaskTracker 模型下最多大约在 5000 台机器左右,对于 ...

Global site tag (gtag.js) - Google Analytics