从WordCount开始分析
编写一个例子程序
编写一个从HDFS中读取并计算wordcount的例子程序:
package org.apache.spark.examples
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
objectWordCount {
def main(args : Array[String]) {
valsc = new SparkContext(args(0), "wordcount by hdfs",
System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass()))
//从hadoophdfs的根路径下得到一个文件
valfile = sc.textFile("/hadoop-test.txt")
valcounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("/newtest.txt")
}
}
生成SparkContext实例
在上面例子中,要执行map/reduce操作,首先需要一个SparkContext,因此看看SparkContext的实例生成
def this(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
preferredNodeLocationData)
}
编写WordCount例子时使用了上面列出的构造函数,后面两个environment与 preferredNodeLocationData传入为默认值。
调用updatedConf的单例函数,生成或更新当前的SparkConf实例。
调用SparkContext的默认构造函数。
1.生成并启动监控的Jetty ui,SparkUI.
2.生成TaskScheduler实例,并启动。
此函数会根据不同的master name生成不同的TaskScheduler实例。,yarn-cluster为YarnClusterScheduler。
主要用来启动/停止task,监控task的运行状态。
private[spark] vartaskScheduler = SparkContext.createTaskScheduler(this, master, appName)
taskScheduler.start()
3.生成DAGScheduler实例,并启动。
@volatileprivate[spark] vardagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
在scheduler进行start操作后,通过调用postStartHook来把SparkContext添加到appmaster中。
生成WorkerRunnable线程,通过nmclient启动worker对应的container。此container线程CoarseGrainedExecutorBackend的实例,此实例通过Executor实例来加载相关的task。
SparkContext.textFile生成RDD
此方法用来生成RDD的实例,通常读取文本文件的方式通过textFile来进行,并其调用hadoopFile来执行。
通过hadoopFile得到一个HadoopRDD<K,V>的实例后,通过.map得到V的值。并生成RDD返回。
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits).map(pair => pair._2.toString)
}
最终通过hadoopFile函数生成一个HadoopRDD实例。
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
valconfBroadcast = broadcast(newSerializableWritable(hadoopConfiguration))
valsetInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minSplits)
}
RDD函数的抽象执行
reduceByKey需要执行shuffle的reduce,也就是需要多个map中的数据集合到相同的reduce中运行,生成相关的DAG任务
valfile = sc.textFile("/hadoop-test.txt")
valcounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("/newtest.txt")
在以上代码中,textFile,flatMap,map,reduceByKey都是spark中RDD的transformation,
而saveAsTextFile才是RDD中进行执行操作的action.
以下引用http://my.oschina.net/hanzhankang/blog/200275的相关说明:
具体可参见:http://spark.apache.org/docs/0.9.0/scala-programming-guide.html。
1,transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD
2,action是得到一个值,或者一个结果(直接将RDD cache到内存中)
所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。
transformation操作:
map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter(func) : 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多个结果
mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition
mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index
sample(withReplacement,faction,seed):抽样
union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合
distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型
join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数
cartesian(otherDataset):笛卡尔积就是m*n,大家懂的
action操作:
reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的
collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组
count():返回的是dataset中的element的个数
first():返回的是dataset中的第一个元素
take(n):返回前n个elements,这个士driver program返回的
takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed
saveAsTextFile(path):把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中
saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统
countByKey():返回的是key对应的个数的一个map,作用于一个RDD
foreach(func):对dataset中的每个元素都使用func
RDD的action中提交Job
在执行RDD的saveAsTextFile时调用SparkContext.runJob方法
saveAsTextFile方法,--> saveAsHadoopFile,最终调用SparkContext.runJob方法
def saveAsTextFile(path: String) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
......以下一行代码就是在saveASTextFile函数嵌套调用中最终调用的函数,调用SparkContext.runJob
self.context.runJob(self, writeToFile _)
SparkContext.runJob的定义:
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
SparkContext的最终执行runJob函数定义
def runJob[T, U: ClassTag](
rdd: RDD[T],//此处是具体的RDD实例值
func: (TaskContext, Iterator[T]) => U,//具体的执行的action的逻辑,如reduceByKey
partitions: Seq[Int],//分区数组,一个数值从0到partitions.size-1
allowLocal: Boolean,//是否可以在本地执行
//result的处理逻辑,每一个Task的处理
resultHandler: (Int, U) => Unit) {
valcallSite = getCallSite
valcleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
valstart = System.nanoTime
通过DAGScheduler.runJob去执行job的运行操作,请看下面的DAGScheduler处理job提交。
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}
DAGScheduler处理job提交
上面的函数最终通过DagScheduler.runJob进行执行。
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null)
{
valwaiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
等待job运行完成。
waiter.awaitResult() match {
case JobSucceeded => {}
case JobFailed(exception: Exception, _) =>
logInfo("Failed to run " + callSite)
throwexception
}
}
调用DAGShceduler.submitJob来提交任务。
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
{
// Check to make sure we are not launching a task on a partition that does not exist.
valmaxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions).foreach { p =>
thrownew IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
valjobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
returnnew JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
valfunc2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
valwaiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
向akka的actor发送一个event,此event为JobSubmitted,!表示发送消息
eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
在DAGShceduler中的start方法时,会生成如下代码,此代码receive接收eventProcessActor发送的消息并进行处理
def start() {
eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
/**
* A handle to the periodical task, used to cancel the task when the actor is stopped.
*/
varresubmissionTask: Cancellable = _
overridedef preStart() {
import context.dispatcher
/**
* A message is sent to the actor itself periodically to remind the actor to resubmit failed
* stages. In this way, stage resubmission can be done within the same thread context of
* other event processing logic to avoid unnecessary synchronization overhead.
*/
resubmissionTask = context.system.scheduler.schedule(
RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
}
/**
* The main event loop of the DAG scheduler.
*/
接收发送的scheduler事件,并通过processEvent进行处理。
def receive = {
caseevent: DAGSchedulerEvent =>
logTrace("Got event of type " + event.getClass.getName)
/**
* All events are forwarded to `processEvent()`, so that the event processing logic can
* easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
* for details.
*/
if (!processEvent(event)) {
submitWaitingStages()
} else {
resubmissionTask.cancel()
context.stop(self)
}
}
}))
}
processEvent中处理JobSubmitted的处理流程:
以下代码中生成一个finalStage,每一个JOB都有一个finalStage,根据job划分出不同的stage,并且提交stage:
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
varfinalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD
// whose underlying HDFS files have been deleted.
finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
} catch {
casee: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
returnfalse
}
valjob = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
如果可以本地运行,同时此finalStage没有stage的依赖关系,同时partitions只有一个。也就是只有一个处理的split
那么这时直接通过localThread的方式来运行此job实例。不通过TaskScheduler进行处理。
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job, Array(), properties))
runLocally(job)
} else {
否则表示partitions有多个,或者stage本身的依赖关系,也就是像reduce这种场景。
根据job对应的stage(finalStage),调用submitStage,通过stage之间的依赖关系得出stage DAG,并以依赖关系进行处理:
idToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties))
submitStage(finalStage)
}
submitStage方法处理流程:
private def submitStage(stage: Stage) {
valjobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waiting(stage) && !running(stage) && !failed(stage)) {
valmissing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
running += stage
} else {
for (parent <- missing) {
submitStage(parent)
}
waiting += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id)
}
}
对于一个刚生成的job,此时的stage为刚生成,
此时submitStage调用getMissingParentStages得到stage的parent,也就是RDD的依赖关系
生成parentStage是通过RDD的dependencies来生成相关的RDD的依赖关系,
如果依赖关系是ShuffleDependency,生成一个mapStage来作为finalStage的parent,
否则是NarrowDependency,不生成新的stage.如count,各task没有相关的数据依赖
也就是说,对应需要执行shuffle操作的job,会生成mapStage与finalStage进行,
而不需要shuffle的job只需要一个finalStage
private def getMissingParentStages(stage: Stage): List[Stage] = {
valmissing = new HashSet[Stage]
valvisited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
depmatch {
caseshufDep: ShuffleDependency[_,_] =>
valmapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
casenarrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
接下来回到submitStage方法中,如果stage没有missing的stage时(没有parent stage),执行task的提交操作。
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
设置当前的stage为running,因为当前的stage没有parent的stage,直接running当前的stage
running += stage
}else {
for (parent <- missing) {
此stage中包含有parent的stage,因此stage需要进行顺序执行。先执行parent的stage.递归调用
submitStage(parent)
}
设置当前的stage为waiting,表示此stage需要等待parent的执行完成。
waiting += stage
}
执行submitMissingTasks流程处理,把stage根据partition生成TaskSet,通过TaskScheduler提交Task.
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
valmyPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
myPending.clear()
vartasks = ArrayBuffer[Task[_]]()
检查stage是否是mapStage,如果是shuffleMapStage,生成ShuffleMapTask,并添加到tasks列表中。
mapStage表示还有其它stage依赖此stage
if (stage.isShuffleMap) {
for (p <- 0 until stage.numPartitionsif stage.outputLocs(p) == Nil) {
得到rdd stage中当前传入的partition的TaskLocation(也就是Task host)
vallocs = getPreferredLocs(stage.rdd, p)
tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
} else {
否则表示是一个finalStage,此类stage直接输出结果,生成ResultTask,并添加到tasks列表中。
// This is a final stage; figure out its job's missing partitions
valjob = resultStageToJob(stage)
for (id <- 0 until job.numPartitionsif !job.finished(id)) {
valpartition = job.partitions(id)
得到rdd stage中当前传入的partition的TaskLocation(也就是Task host)
vallocs = getPreferredLocs(stage.rdd, partition)
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
valproperties = if (idToActiveJob.contains(jobId)) {
idToActiveJob(stage.jobId).properties
} else {
//this stage will be assigned to "default" pool
null
}
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
如果有生成的tasks,也就是此job中有需要执行的task,
if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
// down the road, where we have several different implementations for local scheduler and
// cluster schedulers.
try {
SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
} catch {
casee: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString)
running -= stage
return
}
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
执行TaskScheduler.submitTasks处理函数,TaskScheduler的实现在on yarn中为YarnClusterScheduler.
请参见下面的TaskScheduler提交task流程分析
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
running -= stage
}
}
到目前为此,job在DAGScheduler的处理流程完成。等待TaskScheduler处理完数据后,回调DAGScheduler.
TaskScheduler提交task流程分析
TaskScheduler在on yarn模式时,实现为YarnClusterScheduler。提交task时,通过调用submitTasks函数。
YarnClusterScheduler继承与TaskSchedulerImpl.
通过TaskSchedulerImpl.submitTasks对task的提交进行处理。
override def submitTasks(taskSet: TaskSet) {
valtasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
生成一个TaskSetManager实例,并把此实例设置到activeTaskSets的容器中。
在生成实例的过程中,会把taskSet传入,并得到要执行的task个数,
并根据task的location信息,
生成副本执行次数的容器copiesRunning,列表的个数为job中task的个数,所有的列表值为0,表示没有副本执行
把task分别放到pendingTasksForExecutor(process_local)此时没有值,
/pendingTasksForHost(node_local),此时此节点的task全在此里面,host在worker注册时已经存在
/pendingTasksForRack(rack)/,通常情况不会有值
pendingTasksWithNoPrefs(待分配),通常情况不会有值。
allPendingTasks(any)
所有的task都在最后一个中。
valmanager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
把TaskSetManager添加到rootPool中。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
针对此TaskSet(job)生成一个跟踪每一个task的容器
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
定时检查taskSet是否被启动,如果没有被启动,提示无资源,如果被启动成功,关闭此检查线程。
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
overridedef run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient memory")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
通过backend发起执行消息,backend是SchedulerBackend的具体实现,
在yarn-cluster模式为CoarseGrainedSchedulerBackend。
backend.reviveOffers()
}
CoarseGrainedSchedulerBackend.reviveOffers
通过driverActor的actor实例发起一个ReviveOffers的事件处理消息。
override def reviveOffers() {
driverActor ! ReviveOffers
}
driverActor 的实现为CoarseGrainedSchedulerBackend.DriverActor实例。
DriverActor中处理revive的函数为receive.其中,处理ReviveOffers部分定义如下:
case ReviveOffers =>
makeOffers()
最终调用的makeOffers函数。
def makeOffers() {
executorHost与freeCores的值由来请查看appmaster启动时的补充
launchTasks(scheduler.resourceOffers(
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
通过CoarseGrainedSchedulerBackend对应的scheduler(TaskSchdulerImpl).resourceOffers得到tasks
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
把executor(worker)对应的host存储到对应的容器中,通过executorid拿host
executorIdToHost(o.executorId) = o.host
得到当前所有注册的worker的host,写入到对应的容器中,此容器表示node_local
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
通过DAGScheduler.executorGained把executorId与host进行处理,
请参见下面DAGScheduler中处理ExecutorGained处理。
executorGained(o.executorId, o.host)
}
}
根据所有的worker,根据每一个worker的的cpu core,生成[arraybuffer[]]
// Build a list of tasks to assign to each worker
valtasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
得到每一个worker可用的cpu
valavailableCpus = offers.map(o => o.cores).toArray
得到rootPool中排序后的队列中的所有的TaskSet存储的TaskSetMansger数组
valsortedTaskSets = rootPool.getSortedTaskSetQueue()
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
varlaunchedTask = false
迭代出每一个TaskSetMansger,同时根据每一个TaskSetMansger,
迭代去按网络的优先级执行PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
scala中的for如果包含多个执行器,也就是<-的表达式,多个用;号分开,后面一个优先前面一个执行
也就是后一个执行完成后,相当于一个嵌套的for
此处开始执行对taskSet的执行节点选择,针对每一个taskset,首先使用PROCESS_LOCAL开始。
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
迭代所有的worker,并在每迭代出一个worker时,在此机器上生成执行taskSet中对应的相关task
针对TaskSetmanager.resourceOffer的处理流程,见后面的细节分析,现在不分析此实现。
launchedTask = false
for (i <- 0 until offers.size) {
valexecId = offers(i).executorId
valhost = offers(i).host
生成task执行的节点信息等,每次执行 resourceOffer生成一个TaskDescription
把task对应的executorid与host添加到对应的activeExecutorIds 与executorsByHost。
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
valtid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskSetTaskIds(taskSet.taskSet.id) += tid
taskIdToExecutorId(tid) = execId
此时把activeExecutorIds的值添加一个正在执行的executor,这个值的作用是当有多个stage的依赖时,
下一个stage在执行submitTasks时,
生成的TaskSetManager中会把新stage对应的task的executor直接使用此executor,也就是 PROCESS_LOCAL.
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= 1
launchedTask = true
}
}
通TaskLocality,如果在一个较小的locality时找到一个task,从这个locality中接着找,
否则跳出去从下一个locality重新找,放大locality的查找条件。
如果launchedTask的值为true,表示在传入的locality级别上查找到task要执行对应的级别,
那么在当前级别下接着去找到下一个可执行的TASK,否则launchedTask的值为false,放大一个locality的级别。
如launchedTask的值为false,当前迭代的locality的级别为PROCESS_LOCAL,那么把级别放大到NODE_LOCAL重新查找.
} while (launchedTask)
}
如果tasks生成成功,设置hasLaunchedTask的值为true,前面我们提到过的submitTasks中的检查线程开始结束。
if (tasks.size > 0) {
hasLaunchedTask = true
}
返回生成成功的task列表。交给CoarseGrainedSchedulerBackend.launchTasks处理
returntasks
}
CoarseGrainedSchedulerBackend.launchTasks处理流程:
通过worker注册的actor,向CoarseGrainedExecutorBackend发送消息,处理LaunchTask事件
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
executorActor(task.executorId) ! LaunchTask(task)
}
}
CoarseGrainedExecutorBackend中处理LaunchTask事件事件。
override def receive = {
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
通过executor执行task,见后面的分析。
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
TaskSetManager.resourceOffer函数,每次执行得到一个task的执行节点。
def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
如果成功的task个数小于当前的job要执行的task的个数,
同时worker中可用的cpu资源需要大于或等于spark.task.cpus配置的值,默认需要大于或等于1.
if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) {
valcurTime = clock.getTime()
得到一个默认的locality的值,默认情况下最有可能是NODE_LOCAL.
此处根据上一次查找可执行节点的时间,得到一个合适此执行时间的一个locality级别。
通过spark.locality.wait配置全局的等待时间。默认为3000ms。作用于PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL
通过spark.locality.wait.process配置PROCESS_LOCAL的等待时间。
通过spark.locality.wait.node配置NODE_LOCAL的等待时间。
通过spark.locality.wait.rack配置RACK_LOCAL的等待时间。
这里的查找方式是通过当前的currentLocalityIndex的值,默认从0开始,找到对应可执行的级别,
检查当前时间减去上次的查找级别的执行时间是否大于上面配置的在此级别的执行时间,
如果大于配置的时间,把currentLocalityIndex的值+1重新检查,返回一个合适的locality级别。
如果执行查找的时间超过了以上配置的几个locality的级别的查找时间,此时返回的值为ANY.
varallowedLocality = getAllowedLocalityLevel(curTime)
首先把当前可执行的locality设置为PROCESS_LOCAL.maxLocality是最大的级别,
得到的可执行级别不能超过此级别,从PROCESS_LOCAL开始一级一级向上加大。
maxLocality的级别从PROCESS_LOCAL一级一级向上加,
如果getAllowedLocalityLevel查找到的级别大于现在传入的级别。把级别设置为传入的级别。
maxLocality传入按PROCESS_LOCAL/NODE_LOCAL/RACK_LOCAL/ANY进行传入。
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
}
通过findTask来得到task对应的执行网络选择。
见下面的TaskSetManager.findTask选择task的执行节点的流程部分
findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
// Found a task; do some bookkeeping and return a task description
valtask = tasks(index)
valtaskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
taskSet.id, index, taskId, execId, host, taskLocality))
设置task的执行副本加一,
// Do various bookkeeping
copiesRunning(index) += 1
valinfo = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
得到当前加载的节点执行级别的index,并更新当前查找此执行节点的查找时间为当前时间。
// Update our locality level for delay scheduling
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
// Serialize and return the task
valstartTime = clock.getTime()
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
// we assume the task can be serialized without exceptions.
valserializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
valtimeTaken = clock.getTime() - startTime
把task添加到runningTasksSet的容器中。
addRunningTask(taskId)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
valtaskName = "task %s:%d".format(taskSet.id, index)
如果task的执行尝试的值为1,表示是第一次尝试执行,通过DAGScheduler触发BeginEvent事件。
if (taskAttempts(index).size == 1)
taskStarted(task,info)
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
}
case _ =>
}
}
None
}
TaskSetManager.findTask选择task的执行节点的流程部分:
从不同的locality级别中取出需要执行的task.
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
此处优先找PROCESS_LOCAL的值,但是我现在还没有搞明白这个pendingTasksForExecutor的值从何来。
从TaskSetManager生成时可以看出pendingTasksForExecutor的值在实例生成时,
从TaskSchedulerImpl.activeExecutorIds中检查并生成。但实例生成此,此容器还没有值。这点还没搞明白。
新的批注:
当有stage的依赖关系时,第一个stage执行完成后,activeExecutorIds的容器会有执行过的executor列表。
对上一个stage执行完成后,新的一个stage开始执行,
生成的TaskSetManager中pendingTasksForExecutor中包含可以直接使用上一个stage中部分task执行的executor的task.
因此,如果有stage的依赖关系时,下一个stage中的task在此时如果executorid相同,直接使用PROCESS_LOCAL来执行。
第一个stage执行时,PROCESS_LOCAL不会被选择,正常情况locality的选择会放大的NODE_LOCAL开始。
for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
}
appmaster启动时的补充
一些需要的说明:在makeOffers中调用了TaskScheduler.resourceOffers函数,
此函数中传入的executorHost,freeCores的值什么时候得到呢:
我们知道在appmaster启动的时候。会根据设置的num-worker个数,向rm申请worker运行的资源,
并通过WorkerRunnable启动worker对应的container。启动CoarseGrainedExecutorBackend实例在container中.
在实例中连接appmaster对应的sparkContext中的scheduler中的CoarseGrainedSchedulerBackend.DriverActor
此DriverActor的name为CoarseGrainedSchedulerBackend.ACTOR_NAME.
如下是CoarseGrainedExecutorBackend生成的一些代码片段:
YarnAllocationHandler.allocateResources中得到actor的名称。
val workerId = workerIdCounter.incrementAndGet().toString
valdriverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
YarnAllocationHandler.allocateResources通过WorkerRunnable的线程启动worker的container
valworkerRunnable = new WorkerRunnable(
container,
conf,
sparkConf,
driverUrl,
workerId,
workerHostname,
workerMemory,
workerCores)
new Thread(workerRunnable).start()
在CoarseGrainedExecutorBackend实例启动时,向actor注册。
overridedef preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
发起worker启动时注册Executor的消息。
driver ! RegisterExecutor(executorId, hostPort, cores)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
CoarseGrainedSchedulerBackend中发起RegisterExecutor的事件处理。
def receive = {
case RegisterExecutor(executorId, hostPort, cores) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
如果此executorActor中已经包含有发送此消息过来的actor,表示此worker已经注册,
通过发送消息过来的actor(sender表示发送此消息的actor)发送一个RegisterExecutorFailed事件。
if (executorActor.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
否则表示actor(worker)还没有被注册,把actor添加到executorActor中,
同时向发送消息过来的actor(sender表示发送此消息的actor)发送一个RegisteredExecutor消息.
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender
添加在TaskScheduler中提交task时使用的executorHost,与freeCores
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
把现在注册的所有的节点添加到TaskScheduler.executorsByHost中。在生成TaskSetManager是会使用
makeOffers()
}
CoarseGrainedExecutorBackend中接收appmaster中scheduler的receive.
针对 RegisteredExecutor与 RegisterExecutorFailed的处理流程:
receive函数中处理RegisterExecutorFailed:如果已经存在,直接exit掉此jvm.
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
receive函数中处理RegisteredExecutor:如果不存在,生成Executor实例。此时worker启动完成,
并向master注册成功。
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
RDD的依赖关系
..........
Spark中的Scheduler
........
Task的执行过程
相关推荐
三、WordCount代码分析 1. **Mapper阶段**:Mapper类通常是实现`org.apache.hadoop.mapreduce.Mapper`接口。在这个阶段,Mapper读取输入数据(通常是文本文件),将每一行拆分成单词,然后为每个单词生成一个键值对...
job.setJarByClass(WordCount.class); job.setMapperClass(MapClass.class); job.setCombinerClass(ReduceClass.class); job.setReducerClass(ReduceClass.class); job.setOutputKeyClass(Text.class); job....
- **spark-defaults.conf**: Spark 提交 Job 时的默认配置文件,用于定义 Spark 的全局配置参数,例如 Executor 的内存大小等。 - **spark-env.sh**: Spark 的环境变量配置文件,可以设置一些特定于系统的环境变量,...
### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...
- Job、Stage 和 Task:Job 是用户提交的计算任务,由多个 Stage 组成,Stage 再拆分为可并行执行的 Task。 - Shuffle:数据重新分布的过程,发生在任务执行时,通常在聚合操作或连接操作中发生。 - Resilient ...
- **提交Spark作业**: 假设你有一个Scala或Python编写的Spark应用程序 `app.py`: ``` spark-submit --class "com.example.App" --master local[2] /path/to/app.jar ``` 或者使用Python脚本: ``` spark...
`WordCount`是驱动程序,它设置输入和输出路径,初始化Job并提交到Hadoop集群。`WordCountMapper`接收文本行,使用分隔符(如空格)将其拆分为单词,并生成键值对(单词,1)。`WordCountReducer`接收Mapper输出的...
- Hadoop需要通过多次MapReduce作业才能完成排序,而Spark则可以通过单个Job完成,这主要是因为Spark内部支持更为高效的Shuffle机制。 - 随着数据量的增大,Spark的性能优势更加明显,特别是在大规模数据集上的...
火花示例./spark/bin/spark-submit...0.0.1-SNAPSHOT-job-0407.jar -p AWS -in s3n://aws-publicdatasets/common-crawl/crawl-数据/CC-MAIN-2014-10/segments/1394678706211/wat/ -out s3n://my-path/wen/sparkexample_
《Spark大数据处理:技术、应用与性能优化》是高彦杰撰写的一本深入解析Spark技术的书籍,旨在帮助读者理解Spark的核心概念、集群部署、计算模型、工作机制,并提供丰富的编程实践指导。这本书对于程序员和大数据...
在实际应用中,WordCount可以扩展到更复杂的场景,例如在大数据分析、日志处理、搜索引擎索引构建等领域,它的思想也被广泛应用于其他分布式计算框架,如Spark和Flink。理解并掌握WordCount的原理和实现,对于理解...
在本实验报告中,主要涉及了使用MapReduce和Spark两种技术进行数据分析,目的是统计双十一期间的热门商品和年轻人关注的商家。以下是对这些知识点的详细解释: **MapReduce统计最热门商品** MapReduce是一种分布式...
`main`方法则配置并提交了Job,指定输入和输出路径。 在实际操作中,`wordcount-master`这个压缩包可能包含了完整的项目结构,包括源代码、配置文件以及运行和测试WordCount程序所需的所有资源。解压后,你可以在...
3. **Driver Program**:这是整个流程的起点,它配置Job并提交到Hadoop集群执行。 ```java public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = ...
2. 创建名为 `wc.input` 的文件,包含一系列单词,例如 "hadoop mapreduce hive spark" 等,这是 MapReduce 作业的输入数据。 3. 检查文件内容,确保数据已正确录入。 **任务二:将输入文件上传到 HDFS** 1. 使用 ...
在这个过程中,Job是提交执行的应用程序,而Task是从Job中拆分出的工作单元。 2. **Hadoop**:Hadoop是实现MapReduce编程模型的一个开源框架,它包括两个核心组件:HDFS(分布式文件系统)和MapReduce。Hadoop采用...
从给出的文件信息中,我们可以看出,这些内容是关于华为大数据认证HCIP-Big Data Developer H13-723考试题库的知识点,接下来我将对这些知识点进行详细解析。 1. HBase数据写入接口类:在HBase中写入数据时,我们...
手写代码章节详细介绍了常见的算法和数据结构的实现,如冒泡排序、二分查找、快速排序、归并排序、二叉树以及基于Scala的Spark-WordCount实现。这些算法是面试中考察算法能力的常见问题,掌握这些算法对于通过技术...
job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text....