关于Spark的Hash based shuffle,其实已经在http://bit1129.iteye.com/blog/2180214中进行了基本的分析,不过那会对shuffle本身就不甚了解,分析之时有只见树木不见森林之惑,所以Hash Based Shuffle的整体流程并没有分析到位,但是那里却对一些常见的易犯迷糊的问题进行了总结,现在看上去,总结的着实不错,是时候从头到尾把自己写的东西看一遍了,现在是总结了原材料,再看就是升华阶段。同时,Spark的100篇博客,此目标不动摇!
Shuffle流程,第一个问题是什么时候需要Shuffle。Shuffle是由于RDD之间的依赖关系为ShuffleDependency而产生的,那么ShuffleDependency是什么时候建立的。RDD之间的依赖是DAGScheduler划分Stage的关键所在。给定两个RDD a和b,如果b对a的依赖类型是ShuffleDependency,那么划分Stage时,a是一个Stage(称为stage1)的结束,而b是另一个Stage(称为stage2)的开始。b所在的Stage称为shuffle stage。那么RDD之间的这种依赖关系是什么建立的呢?答案:是在RDD算子操作创建RDD时创建了RDD之间的依赖关系,也就是RDD创建之后,就可以通过getDependencies来获取它的依赖了。
因此本文从DAGScheduler根据ShuffleDependency划分Stage开始入手。
DAGScheduler根据ShuffleDependency划分Stage
1. 当作业提交给DAGScheduler,在DAGSchedule的 handleJobSubmitted方法中,参数RDD就是整个job的最后一个RDD,DAGScheduler需要做的是,从后向前回溯整个RDD 的依赖关系图,然后根据RDD之间的依赖关系,创建出不同的Stage,然后从前往后依次将Stage中的任务提交。如下是DAGScheduler的handleJobSubmitted方法
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_], ///这是整个Job的computing chain上的最后一个RDD,RDD之间的依赖已经确立
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int], ///整数数组,是Reducers的个数
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
var finalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)///创建Stage,注意第三个参数,ShuffleDependency初始值为None,jobId是在DAGScheduler的submitJob方法中建立的
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
if (finalStage != null) { ///finalStage为null表示什么情况?
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) ///创建Job,参数是finalStage,那么finalStage应该携带者父Stage
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite.shortForm, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents) ///列出finalStage依赖的Stage
logInfo("Missing parents: " + getMissingParentStages(finalStage))////日志中列出尚未提交执行的父Stages
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job ///Map缓存
activeJobs += job ///记录当前获取的Job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
submitStage(finalStage) ///提交作业
}
}
////如果finalStage为null,则直接提交
submitWaitingStages()
}
在上面的代码中,有三个主要的方法
newStage
submitStage
submitWaitingStages
下面分别来谈
2. newStage方法
2.1 newStage方法
private def newStage(
rdd: RDD[_], ///这是最后一个Stage的final stage,此处的Dependency关系尚未建立
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]], ///handleJobSubmitted方法调用时,传入的值为空
jobId: Int,
callSite: CallSite)
: Stage = ///返回
{
val parentStages = getParentStages(rdd, jobId) ///根据rdd和jobId获取父Stages,那么getParentStages是否会调用newStage方法呢?
val id = nextStageId.getAndIncrement() ///StageId
val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite) ///Stage有id,包含的最后一个RDD,shuffleDep,父Stages,jobId
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
2.2 getParentStages源代码:
/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided jobId if they haven't already been created with a lower jobId.
*/
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { ///针对指定的RDD获取它的Parent Stages
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]] ///RDD堆栈
def visit(r: RDD[_]) { ///当前visit的RDD,无返回值
if (!visited(r)) { ///如果还没有遍历这个RDD
visited += r ///将r放入到已遍历的visited集合中
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) { ////遍历RDD依赖的Dependency,可以是NarrowDependency,可以是ShuffleDependency
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, jobId) ///遇到一个ShfufleDependency, 则返回一个parent stage,
case _ => ///如果是窄依赖,则直接压入堆栈,下次再遍历这个RDD
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd) ///将最后一个RDD放入
while (!waitingForVisit.isEmpty) { ///最后一个RDD放入,所以第一次访问时不为空;while循环中的判断逻辑,visit调用完后就会执行一次,而visit则会更新waitingForVisit
visit(waitingForVisit.pop()) ///visit方法每次访问的都是最后一个压入堆栈的RDD
}
parents.toList
}
2.3 getShuffleMapStage方法源代码
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage ///如果shuffleId和Stage已经建立了对应关系,则直接返回shuffleToMapStage中保存的Stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId) ////注册registerShuffleDependency
// Then register current shuffleDep
val stage = ///调用newOrUsedStage方法
newOrUsedStage(
shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
shuffleDep.rdd.creationSite)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
getShuffleMapStage获取ShuffleMapStage的逻辑中,主要的两个方法
registerShuffleDependencies
newOrUsedStage
Spark涉及的角色图
示例程序
package spark.examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCountHashShuffle {
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
val conf = new SparkConf()
conf.setAppName("SparkWordCount")
conf.setMaster("local[3]")
//Hash based Shuffle;
conf.set("spark.shuffle.manager", "hash");
val sc = new SparkContext(conf)
val rdd = sc.textFile("file:///D:/word.in.3",4);
val rdd1 = rdd.flatMap(_.split(" "))
val rdd2 = rdd1.map((_, 1))
val rdd3 = rdd2.reduceByKey(_ + _, 3);
rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
println(rdd3.toDebugString)
sc.stop
}
}
整个DAG图表示如下:
MappedRDD[5]///这个rdd没有打印出来,是在saveAsTextFile调用时,内部创建的MappedRDD,将Value转换为(K,V)的形式,K是null,但是DAG构造Stage图时会构造这个
(3) ShuffledRDD[4] at reduceByKey at SparkWordCountHashShuffle.scala:20 []
+-(5) MappedRDD[3] at map at SparkWordCountHashShuffle.scala:19 []
| FlatMappedRDD[2] at flatMap at SparkWordCountHashShuffle.scala:18 []
| file:///D:/word.in.3 MappedRDD[1] at textFile at SparkWordCountHashShuffle.scala:17 []
| file:///D:/word.in.3 HadoopRDD[0] at textFile at SparkWordCountHashShuffle.scala:17 []
Hash based Shuffle Writer源代码解析
1. hash based shuffle writer操作发生在ShuffleMapTask的runTask中,首先看看runTask方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
分析下runTask做了哪些事情
- 首先,runTask返回的是一个MapStatus对象,这个里面存放的是什么信息呢?
- 反序列化出来一个rdd和dep的二元组,此二者所指什么
- 从SparkEnv中获取shuffleManager,对于Hash Based Shuffle,这个是HashShuffleManager,它获取ShuffleWriter的依据dep.shuffleHandle, partitionId, context三方面信息。那么dep的shuffleHandle何时创建,里面包含了哪些信息,partitionId指的是该Task要处理的partition。它是要做Shuffle的RDD的一个partitionId。对于对于Hash Based Shuffle,这个ShuffleWriter是HashShuffleWriter
- 调用HashShuffleWriter的write方法将rdd的一个partition写入到map output file中。
- 写完后,执行HashShuffleWriter的stop方法返回MapStatus对象
接下来,逐步的分析上面的五步具体的逻辑
2. rdd和dep的生成
rdd指的是示例代码中的rdd2,这个rdd指的是要进行shuffle算子操作的RDD,而计算得到的rdd3,(MappedRDD)依赖于这个rdd。
3.dep的shuffleHandle生成
dep中的shuffleHandle是在Dependency中实例化的,如下代码所示,它是调用ShuffleManagerManager的registerShuffle方法完成的
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
ShuffleManagerManager的registerShuffle方法:
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
new BaseShuffleHandle(shuffleId, numMaps, dependency) ///shuffleId以及numMaps
}
分享到:
相关推荐
10. **Spark Shuffle优化**:包括减少shuffle数据量、使用更高效的shuffle机制如Sort-based Shuffle和Hash-based Shuffle,以及合理配置executor内存和并行度,以提高性能。 11. **资源管理和调度**:Spark可以与...
Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。
4. **Spark 1.2.0**:Sort-based Shuffle逐渐成为推荐的Shuffle模式,因为它可以提供更好的磁盘和网络效率。然而,Hash-based Shuffle仍然保留,作为低内存使用场景下的备选方案。 5. **Spark 2.0.0**:进一步优化...
### Spark Shuffle机制详解 #### 一、Spark Shuffle概念与作用 **Spark Shuffle**是指在Spark应用程序执行过程中,数据从一个节点或分区移动到另一个节点或分区的过程。这种数据重分布通常发生在诸如`groupByKey`,...
Spark Shuffle 优化 - 参数调优 1 Spark 是一个基于内存的分布式计算框架,它的核心是shuffle操作。Shuffle 操作是 Spark 中最耗时的操作之一,而 shuffle 优化是Spark 优化的关键。 本文将对 Spark Shuffle 优化...
5. 使用sort-based shuffle代替hash-based shuffle:sort-based shuffle可以提供更好的排序效果,减少数据交换。 以上策略需要结合实际应用场景灵活调整,持续监控Spark应用的性能指标,如executor内存使用情况、...
6. **Shuffle操作优化**:Shuffle是Spark性能瓶颈之一,减少不必要的Shuffle(如通过Coalesce操作)和优化Shuffle管理(如使用更高效的Hash Shuffle或Sort Shuffle)能显著提升性能。 7. **宽依赖与窄依赖**:理解...
《Spark大数据商业实战三部曲》是一套深度探讨Spark技术在商业环境中的应用、内核解析及性能优化的实例源码集合。这份压缩包包含了丰富的实践案例和代码,旨在帮助读者深入理解Spark的核心机制,掌握大数据处理的...
【SparkDemo12】是一个基于Apache Spark的演示项目,它可能是为了展示Spark的核心功能和用法。Apache Spark是一个用于大规模数据处理的开源集群计算系统,以其高性能、易用性和广泛的功能而闻名。在这个Demo中,我们...
在Spark源代码阅读的过程中,我们可以深入理解Spark的工作原理和架构设计,这对于提升开发效率和优化性能至关重要。Spark作为一个分布式计算框架,其核心组件包括:弹性分布式数据集(RDD)、存储系统、调度器、 ...
4. Shuffle操作:数据在不同节点间重新分配的过程,是Spark性能优化的重点之一。 二、Spark组件与应用 1. Spark Core:基础执行引擎,负责任务调度、内存管理等。 2. Spark SQL:提供SQL接口,支持结构化数据处理...
为了优化shuffle过程,Spark使用Hash Shuffling和Sort Shuffling策略,后者通过排序保证了数据的稳定性。 Spark的架构是基于Master-Worker模式,由一个或多个Driver(通常是用户的应用程序)和多个Executor组成。...
shuffle调优则需要理解Spark的shuffle过程,通过调整hash分区策略、增加shuffle write buffer大小等手段,优化数据传输和合并过程。 总的来说,Spark性能优化是一个综合的过程,需要从开发阶段就考虑效率问题,并...
### Spark DataFrame将一列展开,把该列所有值都变成新列的方法 在处理大数据时,Apache Spark 是一个非常强大的工具。特别是在数据处理与分析领域,Spark 的 DataFrame API 提供了丰富的功能来帮助用户高效地操作...
Spark SQL在处理大型数据集的join操作时,采用多种策略,如shuffle hash join、broadcast hash join等,根据数据量和内存可用性选择最优策略。 11. **Window Functions**: Spark SQL支持窗口函数,这在处理时间...
SparkSQL是Apache Spark项目的一部分,它提供了一个用于处理结构化数据的强大工具,允许开发者使用SQL或者DataFrame API来操作数据。SparkSQL集成了Hive,因此它可以读取和写入Hive表,同时支持多种数据源,如HDFS、...
4. 缓存存储:Redis 是一个 key-value 存储系统,支持存储的 value 类型包括 string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和 hash(哈希类型)。Redis 支持 push/pop、add/remove ...
This version combines the previous CPU-based hashcat (now called hashcat-legacy) and GPU-based oclHashcat. Hashcat is released as open source software under the MIT license. Current Version ...
随着技术的发展,美团在Spark Shuffle上引入了内存优化策略,如使用RDMA(Remote Direct Memory Access)技术减少网络传输延迟,以及采用更高效的Hash Partitioner和Bucketing策略降低数据冲突。此外,美团还引入了...
在"js-spark-md5-master"文件中,包含了Spark-MD5库的源代码和其他相关资源。主要文件结构可能包括: 1. `spark-md5.js`:这是Spark-MD5的核心库文件,包含了MD5算法的实现。 2. `spark-md5.min.js`:压缩后的版本...