`
wbj0110
  • 浏览: 1617571 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Apache Spark源码走读之2 -- Job的提交与运行

阅读更多

概要

本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。

实验环境搭建

在进行后续操作前,确保下列条件已满足。

  1. 下载spark binary 0.9.1
  2. 安装scala
  3. 安装sbt
  4. 安装java

启动spark-shell

单机模式运行,即local模式

local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

MASTER=local bin/spark-shell

"MASTER=local"就是表明当前运行在单机模式

local cluster方式运行

local cluster模式是一种伪cluster模式,在单机环境下模拟standalone的集群,启动顺序分别如下

  1. 启动master
  2. 启动worker
  3. 启动spark-shell

master

$SPARK_HOME/sbin/start-master.sh

 注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。

master主要是运行类 org.apache.spark.deploy.master.Master,在8080端口启动监听,日志如下图所示

修改配置

  1. 进入$SPARK_HOME/conf目录
  2. 将spark-env.sh.template重命名为spark-env.sh
  3. 修改spark-env.sh,添加如下内容
export SPARK_MASTER_IP=localhost
export SPARK_LOCAL_IP=localhost

运行worker

bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M

worker启动完成,连接到master。打开maser的web ui可以看到连接上来的worker. Master WEb UI的监听地址是http://localhost:8080

启动spark-shell

MASTER=spark://localhost:7077 bin/spark-shell

如果一切顺利,将看到下面的提示信息。

Created spark context..
Spark context available as sc.

可以用浏览器打开localhost:4040来查看如下内容

  1. stages
  2. storage
  3. environment
  4. executors

wordcount

上述环境准备妥当之后,我们在sparkshell中运行一下最简单的例子,在spark-shell中输入如下代码

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

上述代码统计在README.md中含有Spark的行数有多少

部署过程详解

Spark布置环境中组件构成如下图所示。

Spark cluster components

  • Driver Program 简要来说在spark-shell中输入的wordcount语句对应于上图的Driver Program.
  • Cluster Manager 就是对应于上面提到的master,主要起到deploy management的作用
  • Worker Node 与Master相比,这是slave node。上面运行各个executor,executor可以对应于线程。executor处理两种基本的业务逻辑,一种就是driver programme,另一种就是job在提交之后拆分成各个stage,每个stage可以运行一到多个task

Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。

JOB的生成和运行

job生成的简单流程如下

  1. 首先应用程序创建SparkContext的实例,如实例为sc
  2. 利用SparkContext的实例来创建生成RDD
  3. 经过一连串的transformation操作,原始的RDD转换成为其它类型的RDD
  4. 当action作用于转换之后RDD时,会调用SparkContext的runJob方法
  5. sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处

 调用路径大致如下

  1. sc.runJob->dagScheduler.runJob->submitJob
  2. DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor
  3. eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数
  4. job到stage的转换,生成finalStage并提交运行,关键是调用submitStage
  5. 在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖窄依赖两种
  6. 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
  7. 提交task是调用函数submitMissingTasks来完成
  8. task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks
  9. TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend
  10. LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件
  11. receiveOffers->executor.launchTask->TaskRunner.run

代码片段executor.lauchTask


 def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor之内。

运算结果是包装成为MapStatus然后通过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣可以自行勾勒。

http://www.cnblogs.com/hseagle/p/3673123.html

分享到:
评论

相关推荐

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    ### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...

    Apache Spark源码走读之4 -- DStream实时流数据处理

    ### Apache Spark源码走读之四:DStream实时流数据处理 #### 一、系统概述与流数据特性 本文档探讨了Apache Spark Streaming的核心概念之一——**DStream**(Discretized Stream)及其如何实现对实时流数据的有效...

    Apache Spark源码走读之5 -- DStream处理的容错性分析

    ### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...

    Apache Spark源码走读:如何进行代码跟读

    ### Apache Spark源码走读:如何进行代码跟读 #### 概述 本文旨在探讨如何有效地进行Apache Spark源码的阅读与理解。Apache Spark作为一款高性能的分布式计算框架,在大数据处理领域占据着重要地位。其核心由Scala...

    Apache_Spark源码走读

    ### Apache Spark 源码解析概述 #### 一、引言 Apache Spark 是一款开源的大规模数据处理框架,因其高效性、灵活性以及易用性在大数据处理领域得到了广泛的应用。对于想要深入了解Spark内部机制的人来说,阅读其...

    ApacheSpark源码走读(二)

    Spark作为一个非常优秀的并行处理框架,集成了一些并行化的算法也是理所当然。Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口。本文就Graphx的代码架构及PageRank在Graphx中的具体实现做...

    C++代码走读意见--开发注意事项

    ### C++代码走读意见与开发注意事项 #### 内存管理与安全性 在软件开发过程中,尤其是使用C++这类提供底层内存操作的语言时,代码质量和安全性尤为重要。本篇将基于给定的“C++代码走读意见--开发注意事项”文件中...

    Storm源码走读笔记

    本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...

    mina源码走读与实例

    ### MINA源码走读与实例 #### 一、MINA概述 **MINA**(**M**ulti **I**nterface **N**etwork **A**pplication)是Apache组织下的一款开源网络通信框架,它主要针对TCP/IP、UDP/IP协议栈提供了高效的封装和扩展能力...

    java8源码-java8-source:java8源码走读

    IDEA走读Java源码坏境搭建 新建一个普通java项目(如:java8-source) 创建package(tech.sqlclub.java_source)存放java源码 java源码在$JAVA_HOME/src.zip 解压就行,mac用户JAVA_HOME查看如下图: 通过Debug,撸...

    hadoop源码分析-HDFS部分

    Hadoop,作为开源大数据处理的基石,其核心组件之一就是HDFS(Hadoop Distributed File System),这是一个高度容错性的分布式文件系统,设计用于运行在廉价硬件上,能够处理大规模的数据。HDFS为大数据处理提供了...

    nova-compute源码分析

    ### nova-compute源码分析 #### 一、Nova概述及工作职责 **1.1 Nova的角色与任务** Nova是OpenStack项目中一个至关重要的组成部分,它主要负责虚拟机实例的生命周期管理,包括创建、调度、运行和销毁等功能。具体...

    Blog-Atheros_Ath9k速率调整算法源码走读_琴剑飘零1

    ### Ath9k速率调整算法解析 ...与之相对比,Minstrel算法凭借其更加灵活和高效的特性,在实际应用中表现更佳。对于开发者而言,深入研究这两种算法的工作原理和实现细节将有助于优化无线通信系统的性能。

    Blog-Atheros_minstrel速率调整算法源码走读 _琴剑飘零1

    《Atheros Minstrel 速率调整算法源码解析》 Minstrel 速率调整算法是 Atheros 无线网卡驱动中用于优化无线通信性能的关键算法,它通过动态调整发送速率来适应网络环境的变化,以提高无线网络的吞吐量和稳定性。...

    学生走读管理暂行办法.docx

    2. **申请走读的流程**: - 学生需通过易三江平台填写申请表,提交相关证明材料给辅导员审核。 - 经过学院或校区负责人审批后,签订走读协议,报学生发展与服务中心备案。 3. **走读手续的有效期与管理规定**: ...

    代码走读记录表模板代码走读记录表模板

    代码走读记录表模板代码走读记录表模板代码走读记录表模板

    spark-2.1.1:spark原始物走读注解解

    《Spark 2.1.1:深度解析与源码阅读笔记》 Spark作为一个开源的分布式计算框架,以其高效、易用和灵活性深受大数据处理领域的欢迎。Spark 2.1.1是其发展中的一个重要版本,它在性能优化、功能增强以及稳定性上都有...

Global site tag (gtag.js) - Google Analytics