`
大涛学长
  • 浏览: 105968 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

一张图轻松掌握 Flink on YARN 应用启动全流程(上)

阅读更多
Flink 支持 Standalone 独立部署和 YARN、Kubernetes、Mesos 等集群部署模式,其中 YARN 集群部署模式在国内的应用越来越广泛。Flink 社区将推出 Flink on YARN 应用解读系列文章,分为上、下两篇。本文基于 FLIP-6 重构后的资源调度模型将介绍 Flink on YARN 应用启动全流程,并进行详细步骤解析。下篇将根据社区大群反馈,解答客户端和Flink Cluster的常见问题,分享相关问题的排查思路。

Flink on YARN 流程图
Flink on YARN集群部署模式涉及YARN和Flink两大开源框架,应用启动流程的很多环节交织在一起,为了便于大家理解,在一张图上画出了Flink on YARN基础架构和应用启动全流程,并对关键角色和流程进行了介绍说明,整个启动流程又被划分成客户端提交(流程标注为紫色)、Flink Cluster启动和Job提交运行(流程标注为橙色)两个阶段分别阐述,由于分支和细节太多,本文会忽略掉一些,只介绍关键流程(基于Flink开源1.9版本源码整理)。





客户端提交流程
1.执行命令:bin/flink run -d -m yarn-cluster ...或bin/yarn-session.sh ...来提交per-job运行模式或session运行模式的应用;

2.解析命令参数项并初始化,启动指定运行模式,如果是per-job运行模式将根据命令行参数指定的Job主类创建job graph;

如果可以从命令行参数(-yid )或YARN properties临时文件(${java.io.tmpdir}/.yarn-properties-${user.name})中获取应用ID,向指定的应用提交Job;
否则当命令行参数中包含 -d(表示detached模式)和 -m yarn-cluster(表示指定YARN集群模式),启动per-job运行模式;
否则当命令行参数项不包含 -yq(表示查询YARN集群可用资源)时,启动session运行模式;
3.获取YARN集群信息、新应用ID并启动运行前检查;

通过YarnClient向YARN ResourceManager(下文缩写为:YARN RM,YARN Master节点,负责整个集群资源的管理和调度)请求创建一个新应用(YARN RM收到创建应用请求后生成新应用ID和container申请的资源上限后返回),并且获取YARN Slave节点报告(YARN RM返回全部slave节点的ID、状态、rack、http地址、总资源、已使用资源等信息);
运行前检查:(1) 简单验证YARN集群能否访问;(2) 最大node资源能否满足flink JobManager/TaskManager vcores资源申请需求;(3) 指定queue是否存在(不存在也只是打印WARN信息,后续向YARN提交时排除异常并退出);(4)当预期应用申请的Container资源会超出YARN资源限制时抛出异常并退出;(5) 当预期应用申请不能被满足时(例如总资源超出YARN集群可用资源总量、Container申请资源超出NM可用资源最大值等)提供一些参考信息。
4.将应用配置(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(flink jars、ship files、user jars、job graph等)上传至分布式存储(例如HDFS)的应用暂存目录(/user/${user.name}/.flink/);

5.准备应用提交上下文(ApplicationSubmissionContext,包括应用的名称、类型、队列、标签等信息和应用Master的container的环境变量、classpath、资源大小等),注册处理部署失败的shutdown hook(清理应用对应的HDFS目录),然后通过YarnClient向YARN RM提交应用;

6.循环等待直到应用状态为RUNNING,包含两个阶段:

循环等待应用提交成功(SUBMITTED):默认每隔200ms通过YarnClient获取应用报告,如果应用状态不是NEW和NEW_SAVING则认为提交成功并退出循环,每循环10次会将当前的应用状态输出至日志:"Application submission is not finished, submitted application is still in ",提交成功后输出日志:"Submitted application "
循环等待应用正常运行(RUNNING):每隔250ms通过YarnClient获取应用报告,每轮循环也会将当前的应用状态输出至日志:"Deploying cluster, current state "。应用状态成功变为RUNNING后将输出日志"YARN application has been deployed successfully." 并退出循环,如果等到的是非预期状态如FAILED/FINISHED/KILLED,就会在输出YARN返回的诊断信息("The YARN application unexpectedly switched to state during deployment. Diagnostics from YARN: ...")之后抛出异常并退出。
Flink Cluster启动流程
1.YARN RM中的ClientRMService(为普通用户提供的RPC服务组件,处理来自客户端的各种RPC请求,比如查询YARN集群信息,提交、终止应用等)接收到应用提交请求,简单校验后将请求转交给RMAppManager(YARN RM内部管理应用生命周期的组件);

2.RMAppManager根据应用提交上下文内容创建初始状态为NEW的应用,将应用状态持久化到RM状态存储服务(例如ZooKeeper集群,RM状态存储服务用来保证RM重启、HA切换或发生故障后集群应用能够正常恢复,后续流程中的涉及状态存储时不再赘述),应用状态变为NEW_SAVING;

3.应用状态存储完成后,应用状态变为SUBMITTED;RMAppManager开始向ResourceScheduler(YARN RM可拔插资源调度器,YARN自带三种调度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最广泛,FifoScheduler功能最简单基本不可用,今年社区已明确不再继续支持FairScheduler,建议已有用户迁至CapacityScheduler)提交应用,如果无法正常提交(例如队列不存在、不是叶子队列、队列已停用、超出队列最大应用数限制等)则抛出拒绝该应用,应用状态先变为FINAL_SAVING触发应用状态存储流程并在完成后变为FAILED;如果提交成功,应用状态变为ACCEPTED;

4.开始创建应用运行实例(ApplicationAttempt,由于一次运行实例中最重要的组件是ApplicationMaster,下文简称AM,它的状态代表了ApplicationAttempt的当前状态,所以ApplicationAttempt实际也代表了AM),初始状态为NEW;

5.初始化应用运行实例信息,并向ApplicationMasterService(AM&RM协议接口服务,处理来自AM的请求,主要包括注册和心跳)注册,应用实例状态变为SUBMITTED;

6.RMAppManager维护的应用实例开始初始化AM资源申请信息并重新校验队列,然后向ResourceScheduler申请AM Container(Container是YARN中资源的抽象,包含了内存、CPU等多维度资源),应用实例状态变为ACCEPTED;

7.ResourceScheduler会根据优先级(队列/应用/请求每个维度都有优先级配置)从根队列开始层层递进,先后选择当前优先级最高的子队列、应用直至具体某个请求,然后结合集群资源分布等情况作出分配决策,AM Container分配成功后,应用实例状态变为ALLOCATED_SAVING,并触发应用实例状态存储流程,存储成功后应用实例状态变为ALLOCATED;

8.RMAppManager维护的应用实例开始通知ApplicationMasterLauncher(AM生命周期管理服务,负责启动或清理AM container)启动AM container,ApplicationMasterLauncher与YARN NodeManager(下文简称YARN NM,与YARN RM保持通信,负责管理单个节点上的全部资源、Container生命周期、附属服务等,监控节点健康状况和Container资源使用)建立通信并请求启动AM container;

9.ContainerManager(YARN NM核心组件,管理所有Container的生命周期)接收到AM container启动请求,YARN NM开始校验Container Token及资源文件,创建应用实例和Container实例并存储至本地,结果返回后应用实例状态变为LAUNCHED;

10.ResourceLocalizationService(资源本地化服务,负责Container所需资源的本地化。它能够按照描述从HDFS上下载Container所需的文件资源,并尽量将它们分摊到各个磁盘上以防止出现访问热点)初始化各种服务组件、创建工作目录、从HDFS下载运行所需的各种资源至Container工作目录(路径为: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);

11.ContainersLauncher(负责container的具体操作,包括启动、重启、恢复和清理等)将待运行Container所需的环境变量和运行命令写到Container工作目录下的launch_container.sh脚本中,然后运行该脚本启动Container;

12.Container进程加载并运行ClusterEntrypoint(Flink JobManager入口类,每种集群部署模式和应用运行模式都有相应的实现,例如在YARN集群部署模式下,per-job应用运行模式实现类是YarnJobClusterEntrypoint,session应用运行模式实现类是YarnSessionClusterEntrypoint),首先初始化相关运行环境:

输出各软件版本及运行环境信息、命令行参数项、classpath等信息;
注册处理各种SIGNAL的handler:记录到日志
注册JVM关闭保障的shutdown hook:避免JVM退出时被其他shutdown hook阻塞
打印YARN运行环境信息:用户名
从运行目录中加载flink conf
初始化文件系统
创建并启动各类内部服务(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)
将RPC address和port更新到flink conf配置
13.启动ResourceManager(Flink资源管理核心组件,包含YarnResourceManager和SlotManager两个子组件,YarnResourceManager负责外部资源管理,与YARN RM建立通信并保持心跳,申请或释放TaskManager资源,注销应用等;SlotManager则负责内部资源管理,维护全部Slot信息和状态)及相关服务,创建异步AMRMClient,开始注册AM,注册成功后每隔一段时间(心跳间隔配置项:${yarn.heartbeat.interval},默认5s)向YARN RM发送心跳来发送资源更新请求和接受资源变更结果。YARN RM内部该应用和应用运行实例的状态都变为RUNNING,并通知AMLivelinessMonitor服务监控AM是否存活状态,当心跳超过一定时间(默认10分钟)触发AM failover流程;

14.启动Dispatcher(负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的 JobManager)及相关服务(包括REST endpoint等),在per-job运行模式下,Dispatcher将直接从Container工作目录加载JobGraph文件;在session运行模式下,Dispatcher将在接收客户端提交的Job(_通过BlockServer接收job graph文件)后再进行后续流程;

15.根据JobGraph启动JobManager(负责作业调度、管理Job和Task的生命周期),构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构);

16.JobManager开始执行ExecutionGraph,向ResourceManager申请资源;

17.ResourceManager将资源请求加入等待请求队列,并通过心跳向YARN RM申请新的Container资源来启动TaskManager进程;后续流程如果有空闲Slot资源,SlotManager将其分配给等待请求队列中匹配的请求,不用再通过18. YarnResourceManager申请新的TaskManager;

**18.YARN ApplicationMasterService接收到资源请求后,解析出新的资源请求并更新应用请求信息;

19.YARN ResourceScheduler成功为该应用分配资源后更新应用信息,ApplicationMasterService接收到Flink JobManager的下一次心跳时返回新分配资源信息;

20.Flink ResourceManager接收到新分配的Container资源后,准备好TaskManager启动上下文(ContainerLauncherContext,生成TaskManager配置并上传至分布式存储,配置其他依赖和环境变量等),然后向YARN NM申请启动TaskManager进程,YARN NM启动Container的流程与AM Container启动流程基本类似,区别在于应用实例在NM上已存在并未RUNNING状态时则跳过应用实例初始化流程,这里不再赘述;

21.TaskManager进程加载并运行YarnTaskExecutorRunner(Flink TaskManager入口类),初始化流程完成后启动TaskExecutor(负责执行Task相关操作);

22.TaskExecutor启动后先向ResourceManager注册,成功后再向SlotManager汇报自己的Slot资源与状态;

SlotManager接收到Slot空闲资源后主动触发Slot分配,从等待请求队列中选出合适的资源请求后,向

TaskManager请求该Slot资源

23.TaskManager收到请求后检查该Slot是否可分配(不存在则返回异常信息)、Job是否已注册(没有则先注册再分配Slot),检查通过后将Slot分配给JobManager;

24.JobManager检查Slot分配是否重复,通过后通知Execution执行部署task流程,向TaskExecutor提交task;

TaskExecutor启动新的线程运行Task。



原文链接:   https://yq.aliyun.com/articles/719262?utm_content=g_1000078167 

本文为云栖社区原创内容,未经允许不得转载。
  • 大小: 680.8 KB
分享到:
评论

相关推荐

    flink on yarn.txt

    编译环境: flink on yarn flink版本:flink1.8.0 scala版本:scala_2.1.2 hadoop版本:hadoop2.6.0-cdh5.9.3 系统:cetos6.4 final

    FLINK_ON_YARN-1.14.3.jar

    CDH6.2 FLINK on yarn : FLINK_ON_YARN-1.14.3.jar

    Flink on Yarn_K8S原理剖析及实践.pdf

    在YARN上运行Flink作业时,Flink的任务管理者(TaskManager)会在YARN的节点上启动,并与Flink的作业管理者(JobManager)协同工作,通过YARN的调度器进行资源的申请和任务的调度执行。 Kubernetes是目前最流行的...

    cdh6.3.1 FLINK_ON_YARN-1.14.0.jar

    CDH6.3.1集成安装flink on yarn服务(通过parcel制作生成的csd文件) 文件名:FLINK_ON_YARN-1.14.0.jar 相关信息如下: 1. flink版本号:1.14.0 2. 系统版本:CentOS7.7 3. CDH版本:6.3.1 4. 扩展版本号:BIN-...

    flink on yarn 缺少依赖NoClassDefFoundError: com/sun/jersey

    flink-hadoop-compatibility_2.12-1.7.1.jar javax.ws.rs-api-2.0.1.jar jersey-common-2.27.jar jersey-core-1.19.4.jar 解决 Exception in thread "main" java.lang.NoClassDefFoundError: ...

    flink on yarn 模式下,flink 1.12.5版本运行所需完整lib包

    在大数据处理领域,Apache Flink 是一个流行的流处理框架,而 Apache Hadoop YARN(Yet Another Resource Negotiator)则是管理...理解并熟练掌握这些知识点对于任何在 YARN 上运行 Flink 的开发者来说都是至关重要的。

    Flink的standalone模式集群启动的流程及注意事项

    3.将主节点配置的文件复制到其他节点上scp -r /flink-1.9.2/ root@192.168.198.131:/yanxiaobo/(133节点同此命令) 4.进入bin目录执行 ./start-cluster.sh启动集群 (之前要做好ssh免密设置,可以省去输密码的麻烦...

    15-Flink from YARN to Kubernetes: 资源优化和容器化实践

    内容概要:本文详细介绍了从 Flink on Yarn 到 Flink on Kubernetes 的迁移过程,探讨了两者之间的资源利用率、隔离性和扩展性的对比,并提出了具体的实施步骤。文中指出,相比于 Flink on Yarn,Flink on ...

    flink-1.13.1 cdh6.3.2

    4. **启动Flink服务**:通过`flink-yarn-session.sh`或`flink-standalone.sh`命令启动Flink服务。 5. **提交作业**:使用`FLINK_ON_YARN-1.13.1.jar`提交你的Flink作业到YARN。 6. **监控和管理**:通过Flink的Web...

    CDH6.3.2编译Flink-1.12.4

    6. **集成到CDH**:将编译好的Flink发布到CDH集群,可能需要将JAR包上传至HDFS,并在YARN上配置相应的应用管理器和服务启动脚本。 7. **测试运行**:最后,你可以通过提交一个简单的Flink作业到YARN来验证集成是否...

    flink依赖jar包——解决NoClassDefFoundError: com/sun/jersey

    当flink on yarn模式运行时,发生如下异常信息,需要将压缩包中的4个依赖jar包放入flink安装路径下的lib目录下。 Exception in thread "main" java.lang.NoClassDefFoundError: ...

    FLINK_ON_YARN-1.12.0.jar

    flink1.12.0 适用于cdh5.2~6.4,系统 el6

    flink-yarn_2.11-1.12.2.jar

    flink on yarn 部署模式需要的jar

    flink新版本bat启动文件.zip

    本文将围绕“flink新版本bat启动文件.zip”这一主题,详细阐述Flink的启动过程,以及如何在新版本中解决bin目录下缺少bat启动文件的问题。 在Flink的早期版本中,通常会在`bin`目录下提供`.bat`文件,以供Windows...

    藏经阁-Deploy Apache Flink Natively on YARN_Kubernetes.pdf

    Flink Standalone Cluster on YARN 通过 ResourceManager 和 ContainerManager 实现资源管理和容器管理。ResourceManager 负责管理集群中的资源,ContainerManager 负责管理容器的生命周期。然而,这种方式存在静态...

    flink-1.12-CDH-6.3.2.zip

    标题“flink-1.12-CDH-6.3.2....其中,FLINK_ON_YARN jar文件用于在YARN上启动Flink,而 parcel 文件则简化了在CDH环境中的部署步骤。通过这些组件,用户可以充分利用Flink的强大流处理能力和CDH的数据处理生态系统。

    yarn调度流程.docx

    * 支持多种应用程序:YARN 可以支持多种应用程序,包括 MapReduce、Spark、Flink 等。 YARN 的应用场景 YARN 的应用场景包括: * 大数据处理:YARN 可以用来处理大规模的数据,例如 Log 处理、数据挖掘等。 * ...

    flink零基础入门.pdf

    Apache Flink 进阶(四):Flink on Yarn/K8s 原理剖析及实践 41 Apache Flink 进阶(五):数据类型和序列化 60 Apache Flink 进阶(六):Flink 作业执行深度解析 71 Apache Flink 进阶(七):网络流控及反压剖析...

    flink-on-yarn-1.13.2.jar

    本人搭建cdh6.3.2环境时候集成flink1.13.2组件使用的flink_on_yarn-1.13.2.jar资源包,已验证可以使用,是本人制作parcel的时候的成品

Global site tag (gtag.js) - Google Analytics