`

YARN/MRv2 MRAppMaster深入剖析—ContainerAllocator分析

 
阅读更多

1. ContainerAllocator概述

ContainerAllocator负责与ResourceManager通信,为作业申请资源。作业的每个任务资源需求可描述为四元组<Priority, hostname,capability,containers>,分别表示作业优先级、期望资源所在的host,资源量(当前仅支持内存),container数目,比如:

<10, “node1”, “memory:1G”, 3>//优先级是一个正整数,优先级值越小,优先级越高

<10, “node2”, “memory:2G”, 10>

<2, “*”, “memory:1G”, 20> //*表示这样的资源可来自任意一个节点,即不考虑数据本地性

ContainerAllocator周期性通过心跳与ResourceManager通信,ResourceManager每次会返回已经分配的container列表,完成的container列表等信息。

2. ContainerAllocator工作流程

当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列map task和reduce task,由于reduce task依赖于map task之间结果,所以reduce task会延后调度。在ContainerAllocator中,当map task数目完成一定比例(由mapreduce.job.reduce.slowstart.completedmaps指定,默认是0.05,即5%)且Reduce Task可允许占用的资源(Reduce Task可占用资源比由yarn.app.mapreduce.am.job.reduce.rampup.limit指定)能够折合成整数个任务时,才会调度Reduce Task。

考虑到Map Task和Reduce Task之间的依赖关系,因此,它们之间的数据结构转移也是不一样的,对于Map Task而言,会依次转移到以下几个数据结构中:

scheduled->assigned->completed

对于Reduce Task而言,则按照以下流程:

pending->scheduled->assigned->completed

其中,pengding表示等待ContainerAllocator发送资源请求,scheduled表示已经将资源请求发送给RM,但还没有收到分配的资源,assigned是已经收到RM分配的资源,completed已经未完成。

Reduce Task之所有多出一个pending,主要是为了根据Map Task情况调整Reduce Task状态(在pengding和scheduled中相互转移)。进一步说,这主要是为了防止Map Task饿死,因为在YARN中不再有map slot和reduce slot的概念(这两个概念从一定程度上减少了作业饿死的可能性),只有内存、CPU等真实的资源,需要由ApplicationMaster控制资源申请的顺序,以防止可能产生的作业饿死。

此外,ContainerAllocator将所有任务划分成三类,分别是failed Map、Map和Reduce,并分别赋予它们优先级5、20和10,也就是说,当三种任务同时有资源需求是,会优先分配给failed map,然后是reduce,最后是map。

总结起来,ContainerAllocator工作流程如下:

步骤1 将所有map task的资源需求一次性发送给RM

步骤2 如果达到了Reduce task调度条件,则开始为Reduce Task申请资源。

步骤3 如果为某个task申请到了资源,则取消其他重复资源的申请。由于在HDFS中,任何一个任务通常有三备份,而对于一个任务而言,考虑到rack和any级别的本地性,它可能会对应7个资源请求,分别是:

<20, “node1”, “memory:1G”, 1>

<20, “node2”, “memory:1G”, 1>

<20, “node3”, “memory:1G”, 1>

<20, “rack1”, “memory:1G”, 1>

<20, “rack2”, “memory:1G”, 1>

<20, “rack3”, “memory:1G”, 1>

<20, “*”, “memory:1G”, 1>

一旦该任务获取了以上任何一种资源,则会取消其他6个的资源申请。

步骤4 如果任务运行失败,则会重新为该任务申请资源。

步骤5 如果一个任务运行速度过慢,则会为其额外申请资源以启动备份任务(如果启动了推测执行功能)。

步骤6 如果一个节点失败的任务数目过多,则会撤销对该节点的所有资源申请请求。

3. ContainerAllocator类图

ContainerAllocator实际上是一接口,它只定义了三个事件:CONTAINER_REQ,,CONTAINER_DEALLOCATE和CONTAINER_FAILED,分别表示请求container,释放container和container运行失败。

ContainerAllocator的实现是RMContainerAllocator,它只接收和处理ContainerAllocator接口中定义的三种事件,它的运行是这三种事件驱动的。

RMContainerAllocator中最核心的框架是维护了一个心跳信息,在RMCommunicator类中实现如下:

 while (!stopped.get() && !Thread.currentThread().isInterrupted()) {

   try {

     Thread.sleep(rmPollInterval);

     try {

       heartbeat();

     } catch (YarnException e) {

     LOG.error("Error communicating with RM: " + e.getMessage() , e);

     return;

 } catch (Exception e) {

   LOG.error("ERROR IN CONTACTING RM. ", e);

 }

 } catch (InterruptedException e) {

   LOG.warn("Allocated thread interrupted. Returning.");

   return;

  }

}

其中,heartbeat()函数定义(在RMContainerAllocator类中)如下:

protected synchronized void heartbeat() throws Exception {

  LOG.info("Before Scheduling: " + getStat());

  List<Container> allocatedContainers = getResources();

  LOG.info("After Scheduling: " + getStat());

  if (allocatedContainers.size() > 0) {

    LOG.info("Before Assign: " + getStat());

    scheduledRequests.assign(allocatedContainers);

    LOG.info("After Assign: " + getStat());

  }

  ……

}

其中,getResources()函数用于向RM发送心跳信息,并处理心跳应答。需要注意的是,有些情况下,心跳信息中并不包含新的资源请求信息,即空的心跳信息,这有以下几个作用:

(1)周期性发送心跳,告诉RM自己还活着。

(2)周期性询问RM,以获取新分配的资源和各个container运行状况。

assign()函数是将收到的container分配给某个任务,如果这个container无法分配下去(比如内存空间不够),则是在下次心跳中通知RM释放该container,如果container可以分下去,则会释放对应任务的其他资源请求,同时会向TaskAttempt发送一个TA_ASSIGNED事件,以通知ContainerLauncher启动container。

分享到:
评论

相关推荐

    YARN(MRv2)搭建

    YARN(MRv2)搭建

    yarn-v0.23.2.tar.gz

    yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz

    Flink on Yarn_K8S原理剖析及实践.pdf

    Apache Flink是一个开源的分布式流处理框架,用于在...通过这些资源,用户可以深入了解Flink在分布式环境中的运行机制,掌握在YARN/K8S平台上部署Flink集群的最佳实践,并对Flink的核心概念和技术细节有更全面的认识。

    【自动化脚本】自动启动hdfs/yarn/spark HA集群

    脚本使用:vim编辑脚本,按照自己的配置修改主机号,我的是hadoop1、2是NN;hadoop2、3是Spark Master;hadoop3还是RM;hadoop4、5、6是DN、NM、Spark Worker。编辑完成后在满足“前提”的任意一台主机运行均可。 ...

    yarn-v1.22.5.tar.gz

    2. **锁定文件**:Yarn 使用 `yarn.lock` 文件来确保所有开发者的环境中安装的包版本一致,解决了 npm 的版本漂移问题,提高了项目的可预测性和可重复性。 3. **并行下载**:Yarn 能够并行下载依赖包,显著提升了...

    YARN应用开发与核心源码剖析.pdf

    对于深入理解YARN的工作机制,对源码的剖析是非常必要的。这有助于开发者更准确地把握各个组件之间的交互细节,从而更好地利用YARN的功能。例如,通过跟踪`getNewApplication`和`submitApplication`方法的具体实现,...

    YARN框架代码详细分析

    3. MRAppMaster分配器:涉及MRAppMaster的代码分析、任务周期管理及资源分配、以及shuffle过程。MRAppMaster负责应用程序的任务调度、监控、资源管理等,而shuffle过程则主要涉及MapReduce任务的数据排序和重分布。 ...

    Yarn框架代码详细分析

    YARN的WEB框架解析包括了对YARN WEB界面代码的分析以及它提供的一些功能,如集群信息的查看、资源使用情况的监控等。 总之,YARN作为Hadoop生态系统中的重要组件,通过它的设计和代码实现了对大规模分布式计算资源...

    使用yarn create umi安装Ant Design Pro时报错TypeError: self.env.emit is not a function

    TypeError: self.env.emit is not a function at /usr/local/share/.config/yarn/global/node_modules/yeoman-generator/lib/index.js:653:22 at processTicksAndRejections (internal/process/task_queues.js:97:5)...

    使用 Redis,Redict,Valkey,等进行 express-rate-limit 的速率限制存储 .zip

    rate-limit-redis redis中间件的存储 。...安装来自 npm 注册表# Using npm&gt; npm install rate-limit-redis# Using yarn or pnpm&gt; yarn/pnpm add rate-limit-redis来自 Github 发布# Using npm&gt; npm install ...

    yarn-workspace-plugin-since

    在深入探讨这个插件之前,我们首先需要理解Yarn工作区的基本概念。 Yarn工作区是Yarn包管理器的一个特性,允许在一个大的项目中管理多个相互依赖的子项目或库。它通过在根目录创建一个`package.json`文件,并在其中...

    Yarn框架代码详细分析V0.3.pdf

    ### Yarn框架代码详细分析V0.3.pdf 知识点提炼 #### 一、Yarn框架概述 ...通过对各个模块及其功能的深入分析,我们可以更好地理解YARN的工作原理和应用场景,这对于开发和维护基于YARN的大规模分布式系统至关重要。

    SPARK2_ON_YARN-2.4.0.cloudera2.jar

    SPARK2_ON_YARN-2.4.0 jar包下载

    YARN Essentials.PDF

    - **MapReduce v2 (MRv2):** 作为 YARN 的一部分,支持新的 API,提高了灵活性和性能。 **2. MapReduce v1 与 MapReduce v2 的对比:** - **API 变更:** MRv2 提供了新的 API,允许开发者编写更复杂的应用程序。 ...

    Hadoop的yarn详解

    Hadoop的YARN架构是Hadoop版本2.x引入的一个重要组件,它负责处理资源管理和作业调度,而核心的计算任务处理则交给了MapReduce、Tez、Spark等计算框架。YARN的出现是为了解决Hadoop早期版本中的可扩展性问题,它通过...

    Yarn获取Application列表编码

    获取 Application 列表编码是 Yarn 中的一种常见操作,本文将对其进行详细的分析和介绍。 Yarn 获取 Application 列表编码概述 Yarn 获取 Application 列表编码是指通过 Yarn 客户端 API 获取当前 Hadoop 集群中的...

    YARN.Essentials

    这里首先介绍了Hadoop 2.x的发布,对Hadoop 1.x及其MapReduce API的简要介绍,以及MRv1与MRv2的对比。接着,文档讨论了YARN在Hadoop中的作用,以及MapReduce API的向后兼容性和org.apache.hadoop.mapred API的二进制...

    Hadoop技术内幕深入解析YARN架构设计与实现原理PDF

    《Hadoop技术内幕深入解析YARN架构设计与实现原理》这本书深入探讨了Hadoop生态系统中的核心组件YARN(Yet Another Resource Negotiator),它是Hadoop 2.x版本中的关键改进,旨在解决早期Hadoop MapReduce的资源...

Global site tag (gtag.js) - Google Analytics