概要
本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。
准备
- spark已经安装完毕
- spark运行在local mode或local-cluster mode
local-cluster mode
local-cluster模式也称为伪分布式,可以使用如下指令运行
MASTER=local[1,2,1024] bin/spark-shell
[1,2,1024] 分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512M
Driver Programme的初始化过程分析
初始化过程的涉及的主要源文件
- SparkContext.scala 整个初始化过程的入口
- SparkEnv.scala 创建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager
- DAGScheduler.scala 任务提交的入口,即将Job划分成各个stage的关键
- TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行
- SchedulerBackend
- 最简单的单机运行模式的话,看LocalBackend.scala
- 如果是集群模式,看源文件SparkDeploySchedulerBackend
初始化过程步骤详解
步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager
private[spark] val env = SparkEnv.create(
conf,
"",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal)
SparkEnv.set(env)
步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
taskScheduler.start()
TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测
override def start() {
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
checkSpeculatableTasks()
}
}
}
步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
步骤4:启动WEB UI
ui.start()
RDD的转换过程
还是以最简单的wordcount为例说明rdd的转换过程
sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果
步骤1:val rawFile = sc.textFile("README.md")
textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析
scala> sc.textFile("README.md")
14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750
14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB)
14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took 277 ms
14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms
res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13
步骤2: val splittedText = rawFile.flatMap(line => line.split(" "))
flatMap将原来的MappedRDD转换成为FlatMappedRDD
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f))
步骤3:val wordCount = splittedText.map(word => (word, 1))
利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD
步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂
步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey的定义出现在源文件PairRDDFunctions.scala
细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。
接下来再看一看reduceByKey的定义
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializerClass: String = null): RDD[(K, C)] = {
if (getKeyClass().isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}
reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD
Log输出能证明我们的分析
res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13
RDD转换小结
小结一下整个RDD转换过程
HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD
整个转换过程好长啊,这一切的转换都发生在任务提交之前。
运行过程分析
数据集操作分类
在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?
对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input->processing->output"。input和output对应于数据集dataset.
在此基础上作一下简单的分类
- one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map
- one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatMap是size增大的操作,而subtract是size变小的操作
- many-one 多个dataset合并为一个dataset,如combine, join
- one-many 一个dataset分裂为多个dataset, 如groupBy
Task运行期的函数调用
task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于RDD上的各个operation
- TaskRunner.run
- Task.run
- Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask)
- RDD.iterator
- RDD.computeOrReadCheckpoint
- RDD.compute
- RDD.computeOrReadCheckpoint
- RDD.iterator
- Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask)
- Task.run
或许当看到RDD.compute函数定义时,还是觉着f没有被调用,以MappedRDD的compute定义为例
override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)
注意,这里最容易产生错觉的地方就是map函数,这里的map不是RDD中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator
堆栈输出
80 at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)
81 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154)
82 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
83 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
84 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
85 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
86 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
87 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
88 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
89 at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
90 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
91 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
92 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
93 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
94 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
95 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
96 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
97 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
98 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
99 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
100 at org.apache.spark.scheduler.Task.run(Task.scala:53)
101 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
ResultTask
compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。
override def runTask(context: TaskContext): U = {
metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
计算结果的传递
上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.
那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下
- ShffuleMapTask将计算的状态(注意不是具体的数据)包装为MapStatus返回给DAGScheduler
- DAGScheduler将MapStatus保存到MapOutputTrackerMaster中
- ResultTask在执行到ShuffleRDD时会调用BlockStoreShuffleFetcher的fetch方法去获取数据
- 第一件事就是咨询MapOutputTrackerMaster所要取的数据的location
- 根据返回的结果调用BlockManager.getMultiple获取真正的数据
BlockStoreShuffleFetcher的fetch函数伪码
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)
注意上述代码中的getServerStatuses及getMultiple,一个是询问数据的位置,一个是去获取真正的数据。
有关Shuffle的详细解释,请参考”详细探究Spark的shuffle实现一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/
相关推荐
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7
weixin138社区互助养老+ssm(论文+源码)_kaic.zip
光纤到户及通信基础设施报装申请表.docx
项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7
功能完善的电商数据智能爬虫采集系统项目全套技术资料.zip
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
### Android程序开发初级教程(一):初识Android **平台概述** Google推出的Android操作系统平台已经正式亮相,这是一个基于Linux内核的开源操作系统。对于开发者而言,了解其架构和支持的开发语言至关重要。以下是Android平台的架构概览: **平台架构及功能** 1. **应用框架(Application Framework)**:包含可重用和可替换的组件,确保所有软件在该层面上的平等性。 2. **Dalvik虚拟机(Dalvik Virtual Machine)**:一个基于Linux的虚拟机,为Android应用提供运行环境。 3. **集成浏览器(Integrated Browser)**:基于开源WebKit引擎的浏览器,位于应用层。 4. **优化图形(Optimized Graphics)**:包括自定义的2D图形库和遵循OpenGL ES 1.0标准的3D实现。 5. **SQLite数据库**:用于数据存储。 6. **多媒体支持(Media Support)**:支持通用音频、视频以及多种图片格式(如MPEG4, H.264
内容概要:本文档是《组合数学答案-网络流传版.pdf》的内容,主要包含了排列组合的基础知识以及一些经典的组合数学题目。这些题目涵盖了从排列数计算、二项式定理的应用到容斥原理的实际应用等方面。通过对这些题目的解析,帮助读者加深对组合数学概念和技巧的理解。 适用人群:适合初学者和有一定基础的学习者。 使用场景及目标:可以在学习组合数学课程时作为练习题参考,也可以在复习考试或准备竞赛时使用,目的是提高解决组合数学问题的能力。 其他说明:文档中的题目覆盖了组合数学的基本知识点,适合逐步深入学习。每个题目都有详细的解答步骤,有助于读者掌握解题思路和方法。
.net core mvc在线考试系统asp.net考试系统源码考试管理系统 主要技术: 基于.net core mvc架构和sql server数据库,数据库访问采用EF core code first,前端采用vue.js和bootstrap。 功能模块: 系统包括前台和后台两个部分,分三种角色登录。 管理员登录后台,拥有科目管理,题库管理,考试管理,成绩管理,用户管理等功能。 教师登录后台,可进行题库管理,考试管理和成绩管理。 用户登录前台,可查看考试列表,参加考试,查看已考试的结果,修改密码等。 系统实现了国际化,支持中英两种语言。 源码打包: 包含全套源码,数据库文件,需求分析和代码说明文档。 运行环境: 运行需vs2019或者以上版本,sql server2012或者以上版本。
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
包含了登陆注册、用户管理、部门管理、文件管理、权限管理、日志管理、个人中心、数据字典和代码生成这九个功能模块 系统采用了基于角色的访问控制,角色和菜单关联,一个角色可以配置多个菜单权限;然后再将用户和角色关联,一位用户可以赋予多个角色。这样用户就可以根据角色拿到该有的菜单权限,更方便管理者进行权限管控。 本系统还封装了文件管理功能,在其他模块如若要实现图片/文件上传预览时,前端只需导入现成的 Vue 组件即可实现(使用 viewerjs 依赖实现),后端只需定义 String 类型的实体类变量即可,无需再去研究文件上传预览的相关功能,简化了开发者的工作量。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。
三相10Kw光伏并网逆变器。包含全套理图 PCB 源代码
GJB 5236-2004 军用软件质量度量文档,本称准规定了车用软件产品的质重模型和基本的度量。本标准为确定车用软件质量需求和衡量军用 软件产品的能力提供了一个框架。
基于MATLAB车牌识别系统【GUI含界面】.zip。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。
【宿舍管理系统】是一种专为高校或住宿机构设计的信息化解决方案,旨在提高宿舍管理的效率和准确性。该系统包含了多项核心功能,如宿舍管理员管理、宿舍信息维护、查询、卫生检查以及电费缴纳等,旨在实现全面的宿舍运营自动化。 **宿舍管理员管理**功能允许指定的管理员进行用户权限分配和角色设定。这包括对管理员账户的创建、修改和删除,以及设置不同的操作权限,例如只读、编辑或管理员权限。通过这样的权限控制,可以确保数据的安全性和管理的规范性。 **宿舍添加与管理**是系统的基础模块。管理员可以录入宿舍的基本信息,如宿舍号、楼栋、楼层、房间类型(单人间、双人间等)、容纳人数、设施配置等。此外,系统还支持批量导入或导出宿舍信息,方便数据的备份和迁移。 **查询功能**是系统的重要组成部分,它允许管理员和学生根据不同的条件(如宿舍号、楼栋、学生姓名等)快速查找宿舍信息。此外,系统还可以生成各种统计报告,如宿舍占用率、空闲宿舍数量等,以便于决策者进行资源优化。 **卫生检查**功能则是对宿舍卫生状况进行定期评估。管理员可设定检查计划,包括检查周期、评分标准等,并记录每次检查的结果。系统能自动生成卫生报表,用于
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
九缸星形发动机点火器3D
本项目可以作为小程序毕设项目,主要功能为音乐播放器,主要功能是:可以播放歌曲(采用mp3网络连接实现)、专辑封面播放时可以旋转,能够实现开始和暂停播放,可以点击下一首歌曲,主页面实现动态轮播图
出差审批单(表格模板).docx