`
hongs_yang
  • 浏览: 61023 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

spark shuffle部分分析

阅读更多

spark shuffle流程分析

 

回到ShuffleMapTask.runTask函数

 

现在回到ShuffleMapTask.runTask函数中:

 

override def runTask(context: TaskContext): MapStatus = {

 

首先得到要reducetask的个数。

 

valnumOutputSplits = dep.partitioner.numPartitions

 

metrics = Some(context.taskMetrics)

 

 

 

valblockManager = SparkEnv.get.blockManager

 

valshuffleBlockManager = blockManager.shuffleBlockManager

 

varshuffle: ShuffleWriterGroup = null

 

varsuccess = false

 

 

 

try {

 

得到对数据时行serializer操作的类,

 

// Obtain all the block writers for shuffle blocks.

 

valser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)

 

通过shuffleid与要进行reducetask个数,生成ShuffleBlockId

 

同时根据blockid生成ShuffleWriterGroup.shuffle的实现为DiskBlockObjectWriter

 

通过spark.shuffle.consolidateFiles配置是否合并文件的输入。默认的为false,

 

合并文件设置为true,下次再有task在本机运行时,会直接打开当前输入的文件进行输入。

 

shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

 

根据rdditerator取出数据,根据elementkey重新进行partition,重新写入到shuffle

 

// Write the map output to its associated buckets.

 

for (elem <- rdd.iterator(split, context)) {

 

valpair = elem.asInstanceOf[Product2[Any, Any]]

 

valbucketId = dep.partitioner.getPartition(pair._1)

 

每一个partition都对应着一个DiskBlockObjectWriter, 通过此实例的write函数,写入shuffle的数据。

 

也就是说,这个时候此RDD远行的task个数为core的个数,此时打开的文件个数为corenum*numpartition

 

shuffle.writers(bucketId).write(pair)

 

}

 

 

 

// Commit the writes. Get the size of each bucket block (total block size).

 

vartotalBytes = 0L

 

vartotalTime = 0L

 

把这次打开的所有的文件全部commit,同时关闭文件的输入。

 

valcompressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>

 

writer.commit()

 

writer.close()

 

valsize = writer.fileSegment().length

 

totalBytes += size

 

totalTime += writer.timeWriting()

 

MapOutputTracker.compressSize(size)

 

}

 

 

 

// Update shuffle metrics.

 

valshuffleMetrics = new ShuffleWriteMetrics

 

shuffleMetrics.shuffleBytesWritten = totalBytes

 

shuffleMetrics.shuffleWriteTime = totalTime

 

metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

 

 

 

success = true

 

new MapStatus(blockManager.blockManagerId, compressedSizes)

 

} catch { casee: Exception =>

 

// If there is an exception from running the task, revert the partial writes

 

// and throw the exception upstream to Spark.

 

if (shuffle != null && shuffle.writers != null) {

 

for (writer <- shuffle.writers) {

 

writer.revertPartialWrites()

 

writer.close()

 

}

 

}

 

throwe

 

} finally {

 

// Release the writers back to the shuffle block manager.

 

if (shuffle != null && shuffle.writers != null) {

 

shuffle.releaseWriters(success)

 

}

 

// Execute the callbacks on task completion.

 

context.executeOnCompleteCallbacks()

 

}

 

}

 

 

 

关于SparkEnv

 

ShuffleMapTask.runTask中开始就通过SparkEnv.get去获取SparkEnv里面的内容,

 

SparkEnv中主要通过ThreadLocal来存储此实例,

 

此实例中包含Akka actor,serializer,BlockManager,shuffle使用的MapoutputTracker等。

 

SparkEnv实例生成包含两部分,masterworker,

 

master是在sparkcontext生成时生成,worker是在executor生成时生成

 

因此现在我们来分析下这个类定义

 

 

 

针对每一个Worker中的executor会生成一个SparkEnv实例:

 

Executor实例生成时,会执行发下代码:

 

设置当前executor的属性env为创建一个SparkEnv实例,此实例通过当前的executorId与当前的host生成。

 

private val env = {

 

if (!isLocal) {

 

val_env = SparkEnv.create(conf, executorId, slaveHostname, 0,

 

isDriver = false, isLocal = false)

 

SparkEnv.set(_env)

 

_env.metricsSystem.registerSource(executorSource)

 

_env

 

} else {

 

SparkEnv.get

 

}

 

}

 

 

 

针对master启动时生成的SparkEnv实例:

 

通过在生成SparkContext实例时,生成SparkEnv属性:

 

private[spark] val env = SparkEnv.create(

 

conf,

 

//注意:此处使用的是driver,表示这是一个driver程序(master),worker时这里传入的是具体的executorid

 

"<driver>",

 

conf.get("spark.driver.host"),

 

conf.get("spark.driver.port").toInt,

 

isDriver = true,

 

isLocal = isLocal)

 

SparkEnv.set(env)

 

 

 

生成的env实例,此实例是一个线程本地实例,每一个线程都有自己独立的SparkEnv

 

private val env = new ThreadLocal[SparkEnv]

 

声明可变的变量,用来存储最后变化的实例,通过sparkEnv.get时如果env不存在,会拿这个值

 

@volatileprivatevarlastSetSparkEnv : SparkEnv = _

 

 

 

def set(e: SparkEnv) {

 

lastSetSparkEnv = e

 

env.set(e)

 

}

 

 

 

def get: SparkEnv = {

 

Option(env.get()).getOrElse(lastSetSparkEnv)

 

}

 

 

 

下面是sparkenvcreate函数:

 

 

 

private[spark] def create(

 

conf: SparkConf,

 

executorId: String,

 

hostname: String,

 

port: Int,

 

isDriver: Boolean,

 

isLocal: Boolean): SparkEnv = {

 

 

 

val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port,

 

conf = conf)

 

 

 

// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),

 

// figure out which port number Akka actually bound to and set spark.driver.port to it.

 

if (isDriver && port == 0) {

 

conf.set("spark.driver.port", boundPort.toString)

 

}

 

 

 

valclassLoader = Thread.currentThread.getContextClassLoader

 

 

 

// Create an instance of the class named by the given Java system property, or by

 

// defaultClassName if the property is not set, and return it as a T

 

def instantiateClass[T](propertyName: String, defaultClassName: String): T = {

 

valname = conf.get(propertyName, defaultClassName)

 

Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]

 

}

 

生成一个Serializermanager实例

 

valserializerManager = new SerializerManager

 

得到配置的Serializer实例,这个地方有部分资料建议配置为org.apache.spark.serializer.KryoSerializer.

 

请参见http://spark.apache.org/docs/0.9.0/tuning.html的说明。

 

valserializer = serializerManager.setDefault(

 

conf.get("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)

 

闭包使用的serializer,如果闭包中函数使用了大量的对象,可修改默认的值

 

valclosureSerializer = serializerManager.get(

 

conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),

 

conf)

 

此部分检查是否是driver(也就是是否是master)

 

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {

 

如果是master时,生成一个actor的实例,

 

if (isDriver) {

 

logInfo("Registering " + name)

 

actorSystem.actorOf(Props(newActor), name = name)

 

} else {

 

否则表示是worker,生成一个actor的引用。对指定的actor进行连接,生成actorref

 

valdriverHost: String = conf.get("spark.driver.host", "localhost")

 

valdriverPort: Int = conf.getInt("spark.driver.port", 7077)

 

Utils.checkHost(driverHost, "Expected hostname")

 

valurl = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"

 

valtimeout = AkkaUtils.lookupTimeout(conf)

 

logInfo(s"Connecting to $name: $url")

 

Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)

 

}

 

}

 

此处生成BlockManagerMaster实例,如果是driver时,

 

会生成一个名称为BlockManagerMasterBlockManagerMasterActor实例。

 

否则表示是worker,生成BlockManagerMaster,并创建与master中的BlockManagerMasterActoractorref引用。

 

BlockManagerMasterActor中通过配置spark.storage.blockManagerTimeoutIntervalMs,默认值为60000ms

 

定期检查上面注册的BlockManagerId是否过期。

 

valblockManagerMaster = new BlockManagerMaster(registerOrLookup(

 

"BlockManagerMaster",

 

new BlockManagerMasterActor(isLocal, conf)), conf)

 

生成BlockManager,BlockManager中会生成ShuffleBlockManager,DiskBlockManager,memory/diskstore.

 

针对此BlockManager,生成一个BlockManagerId实例,

 

通过masteractor(BlockManagerMasterActor),master注册此block,并定期向master发送心跳。

 

心跳的发送通过spark.storage.blockManagerTimeoutIntervalMs配置的值/4

 

valblockManager = new BlockManager(executorId,

 

actorSystem, blockManagerMaster, serializer, conf)

 

 

 

valconnectionManager = blockManager.connectionManager

 

 

 

valbroadcastManager = new BroadcastManager(isDriver, conf)

 

生成CacheManager,

 

valcacheManager = new CacheManager(blockManager)

 

生成MapOutputTracker,如果是master时,生成 MapOutputTrackerMaster,否则生成 MapOutputTracker

 

// Have to assign trackerActor after initialization as MapOutputTrackerActor

 

// requires the MapOutputTracker itself

 

valmapOutputTracker = if (isDriver) {

 

new MapOutputTrackerMaster(conf)

 

} else {

 

new MapOutputTracker(conf)

 

}

 

如果是master时,生成 MapOutputTrackerMasterActor实例,否则生成对actor的引用。

 

mapOutputTracker.trackerActor = registerOrLookup(

 

"MapOutputTracker",

 

new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]))

 

生成ShuffleFetcher的实例,通过spark.shuffle.fetcher 配置,默认为BlockStoreShuffleFetcher

 

valshuffleFetcher = instantiateClass[ShuffleFetcher](

 

"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")

 

 

 

valhttpFileServer = new HttpFileServer()

 

httpFileServer.initialize()

 

conf.set("spark.fileserver.uri", httpFileServer.serverUri)

 

 

 

valmetricsSystem = if (isDriver) {

 

MetricsSystem.createMetricsSystem("driver", conf)

 

} else {

 

MetricsSystem.createMetricsSystem("executor", conf)

 

}

 

metricsSystem.start()

 

 

 

// Set the sparkFiles directory, used when downloading dependencies. In local mode,

 

// this is a temporary directory; in distributed mode, this is the executor's current working

 

// directory.

 

valsparkFilesDir: String = if (isDriver) {

 

Utils.createTempDir().getAbsolutePath

 

} else {

 

"."

 

}

 

 

 

// Warn about deprecated spark.cache.class property

 

if (conf.contains("spark.cache.class")) {

 

logWarning("The spark.cache.class property is no longer being used! Specify storage " +

 

"levels using the RDD.persist() method instead.")

 

}

 

 

 

new SparkEnv(

 

executorId,

 

actorSystem,

 

serializerManager,

 

serializer,

 

closureSerializer,

 

cacheManager,

 

mapOutputTracker,

 

shuffleFetcher,

 

broadcastManager,

 

blockManager,

 

connectionManager,

 

httpFileServer,

 

sparkFilesDir,

 

metricsSystem,

 

conf)

 

}

 

 

 

ShuffleBlockManager.forMapTask函数

 

shuffleBlockManager.forMapTask函数是shufflemaptask运行shuffle的核心函数,

 

此函数中会生成ShuffleWriterGroup实例,

 

并根据运行的task个数,通常是cpu core个数*reducepartitionshuffle个文件,每一次的运行都会生成这么多个文件。

 

因此这部分会同时打开core*reduceparitionnumfile,每一个的maptask运行都会生成这么多个文件。

 

此部分完成后就会产生大量的map output文件个数,总文件个数为maptasknum*reducetasknum个文件。

 

同时spark中为了控制文件的生成个数,可通过spark.shuffle.consolidateFiles配置是否重用write文件。默认为false,

 

如果此值设置为true,每一个worker通常只生成core*reducetasknum个文件。

 

每一个文件打开通过spark.shuffle.file.buffer.kb配置的缓存大小。默认为100kb,也就是一次运行中

 

每一个worker中会有core*reducetasknum*100kb的内存buffer的使用。由这部分我个人认为,

 

这玩意还是不合适maptask的任务太多的分析任务。Mapreduceshuffle从性能上会比这要慢一些,

 

但是从对大数据量的支持上还是要好一些。

 

函数定义

 

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {

 

生成一个ShuffleWriterGroup实例

 

new ShuffleWriterGroup {

 

shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))

 

privatevalshuffleState = shuffleStates(shuffleId)

 

privatevarfileGroup: ShuffleFileGroup = null

 

如果spark.shuffle.consolidateFiles配置的值为true,检查是否有上次生成的writer文件,重新打开这个文件。

 

也就是在文件中进行append操作。

 

valwriters: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {

 

fileGroup = getUnusedFileGroup()

 

Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>

 

valblockId = ShuffleBlockId(shuffleId, mapId, bucketId)

 

blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)

 

}

 

} else {

 

否则每一个task都会生成新的writer文件。

 

Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>

 

valblockId = ShuffleBlockId(shuffleId, mapId, bucketId)

 

此处主要是通过sparkenv中的diskBlockMangaer来在指定的路径下生成文件。

 

路径通过spark.local.dir配置。默认为java.io.tmpdir

 

valblockFile = blockManager.diskBlockManager.getFile(blockId)

 

// Because of previous failures, the shuffle file may already exist on this machine.

 

// If so, remove it.

 

if (blockFile.exists) {

 

if (blockFile.delete()) {

 

logInfo(s"Removed existing shuffle file $blockFile")

 

} else {

 

logWarning(s"Failed to remove existing shuffle file $blockFile")

 

}

 

}

 

blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)

 

}

 

}

 

这个函数在shuffleMapTask执行完成的时候调用。如果上面提到的配置为true时,

 

会把writerblockfile放到一个容器中,下一次task运行时,会直接打开这个blockfile文件。

 

overridedef releaseWriters(success: Boolean) {

 

if (consolidateShuffleFiles) {

 

if (success) {

 

valoffsets = writers.map(_.fileSegment().offset)

 

fileGroup.recordMapOutput(mapId, offsets)

 

}

 

recycleFileGroup(fileGroup)

 

} else {

 

shuffleState.completedMapTasks.add(mapId)

 

}

 

}

 

 

 

privatedef getUnusedFileGroup(): ShuffleFileGroup = {

 

valfileGroup = shuffleState.unusedFileGroups.poll()

 

if (fileGroup != null) fileGroupelse newFileGroup()

 

}

 

 

 

privatedef newFileGroup(): ShuffleFileGroup = {

 

valfileId = shuffleState.nextFileId.getAndIncrement()

 

valfiles = Array.tabulate[File](numBuckets) { bucketId =>

 

valfilename = physicalFileName(shuffleId, bucketId, fileId)

 

blockManager.diskBlockManager.getFile(filename)

 

}

 

valfileGroup = new ShuffleFileGroup(fileId, shuffleId, files)

 

shuffleState.allFileGroups.add(fileGroup)

 

fileGroup

 

}

 

 

 

privatedef recycleFileGroup(group: ShuffleFileGroup) {

 

shuffleState.unusedFileGroups.add(group)

 

}

 

}

 

}

 

 

 

DAGShuduler中注册shuffleidmapStatus

 

DAGSheduler的调度中,启动一个stage时,如果是shuffle stage,会执行如下代码:

 

DAGsheduler.runjob-->submitJob-->JobSubmitted actor-->

 

newStage传入参数getParentStages-->getShuffleMapStage-->newOrUsedStage

 

 

 

private def newOrUsedStage(

 

rdd: RDD[_],

 

numTasks: Int,

 

shuffleDep: ShuffleDependency[_,_],

 

jobId: Int,

 

callSite: Option[String] = None)

 

: Stage =

 

{

 

valstage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)

 

if (mapOutputTracker.has(shuffleDep.shuffleId)) {

 

valserLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)

 

vallocs = MapOutputTracker.deserializeMapStatuses(serLocs)

 

for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i))

 

stage.numAvailableOutputs = locs.size

 

} else {

 

master中注册此shuffleid

 

// Kind of ugly: need to register RDDs with the cache and map output tracker here

 

// since we can't do it in the RDD constructor because # of partitions is unknown

 

logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")

 

mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)

 

}

 

stage

 

}

 

 

 

回到dagsheduler的调度中,当shuffle的所有的task处理完成后,会调用如下代码:

 

....

 

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

 

.....

 

case smt: ShuffleMapTask =>

 

valstatus = event.result.asInstanceOf[MapStatus]

 

valexecId = status.location.executorId

 

logDebug("ShuffleMapTask finished on " + execId)

 

if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {

 

logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)

 

} else {

 

第一个task完成后,都会把map返回的MapStatus(记录有location信息)记录到stageoutputloc中。

 

stage.addOutputLoc(smt.partitionId, status)

 

}

 

if (running.contains(stage) && pendingTasks(stage).isEmpty) {

 

markStageAsFinished(stage)

 

logInfo("looking for newly runnable stages")

 

logInfo("running: " + running)

 

logInfo("waiting: " + waiting)

 

logInfo("failed: " + failed)

 

if (stage.shuffleDep != None) {

 

.........................................

 

如果所有的shuffletask都执行完成,把此stage对应的shuffled与所有的location注册到mapOutputTracker

 

此处是通过DAGSheculer来完成的,因此,mapoutputtracker是一个MapOutputTrackerMaster的实现。

 

mapOutputTracker.registerMapOutputs(

 

stage.shuffleDep.get.shuffleId,

 

stage.outputLocs.map(list => if (list.isEmpty) nullelse list.head).toArray,

 

changeEpoch = true)

 

}

 

 

 

Shuffle的读取计算

 

 

 

此时shuffleMAP RDD执行完成后,会通过PairRDDFunctions来做处理

 

回到PairRDDFunctions中的reduceByKey

 

reduceByKey-->combineByKey

 

再次来看这个函数的定义

 

def combineByKey[C](createCombiner: V => C,

 

mergeValue: (C, V) => C,

 

mergeCombiners: (C, C) => C,

 

partitioner: Partitioner,

 

mapSideCombine: Boolean = true,

 

serializerClass: String = null): RDD[(K, C)] = {

 

if (getKeyClass().isArray) {

 

if (mapSideCombine) {

 

thrownew SparkException("Cannot use map-side combining with array keys.")

 

}

 

if (partitioner.isInstanceOf[HashPartitioner]) {

 

thrownew SparkException("Default partitioner cannot partition array keys.")

 

}

 

}

 

valaggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

 

如果当前的RDDpartitioner与传入的partitioner相等,表示是一个map,不需要进行shuffle,直接在map端合并。

 

if (self.partitioner == Some(partitioner)) {

 

self.mapPartitionsWithContext((context, iter) => {

 

new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))

 

}, preservesPartitioning = true)

 

} elseif (mapSideCombine) {

 

如果设置有在map端先进行一次合并,类似于mapreduce中的combine,先在map端执行一次合并,

 

并生成MapPartitionsRDD

 

valcombined = self.mapPartitionsWithContext((context, iter) => {

 

aggregator.combineValuesByKey(iter, context)

 

}, preservesPartitioning = true)

 

生成一个ShuffledRDD实例,在reduce端执行合并操作。合并的核心函数是aggregator实例中定义的相关函数。

 

valpartitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)

 

.setSerializer(serializerClass)

 

partitioned.mapPartitionsWithContext((context, iter) => {

 

new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))

 

}, preservesPartitioning = true)

 

} else {

 

不执行combiner操作,直接在reduce端进行shuffle操作。

 

// Don't apply map-side combiner.

 

valvalues = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)

 

values.mapPartitionsWithContext((context, iter) => {

 

new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))

 

}, preservesPartitioning = true)

 

}

 

}

 

 

 

Reduce端,生成为ShuffledRDD。数据计算函数通过compute函数完成。

 

ShuffledRDD中计算函数的实现

 

override def compute(split: Partition, context: TaskContext): Iterator[P] = {

 

valshuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId

 

通过指定的shuffledid,拿到shuffle完成的数据。

 

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context,

 

SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf))

 

}

 

 

 

SparkEnv中拿到shuffleFetcher的实例。从SparkEnv生成来看,

 

通过spark.shuffle.fetcher 配置,默认为BlockStoreShuffleFetcher

 

Sparkenv中的定义

 

valshuffleFetcher = instantiateClass[ShuffleFetcher](

 

"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")

 

 

 

 

 

BlockStoreShuffleFetcher.fetch的函数:

 

override def fetch[T](

 

shuffleId: Int,

 

reduceId: Int,

 

context: TaskContext,

 

serializer: Serializer)

 

: Iterator[T] =

 

{

 

 

 

logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))

 

valblockManager = SparkEnv.get.blockManager

 

 

 

valstartTime = System.currentTimeMillis

 

executor中的mapoutputtracker会通过GetMapOutputStatuses事件

 

mapoutputtrackermaster中的MapOutputTrackerMasterActor发起得到所有的mapStatus事件。

 

valstatuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)

 

...........................

 

valsplitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]

 

BlockManagerid相同的map结果进行合并,index的值就是mappartition

 

for (((address, size), index) <- statuses.zipWithIndex) {

 

splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))

 

}

 

得到每一个map的输出文件的结果集地址,地址由shuffleid,mappartitionnum,reduceparttion组成。

 

valblocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {

 

case (address, splits) =>

 

(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))

 

}

 

 

 

def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {

 

valblockId = blockPair._1

 

valblockOption = blockPair._2

 

blockOptionmatch {

 

case Some(block) => {

 

block.asInstanceOf[Iterator[T]]

 

}

 

case None => {

 

blockIdmatch {

 

case ShuffleBlockId(shufId, mapId, _) =>

 

valaddress = statuses(mapId.toInt)._1

 

thrownew FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)

 

case _ =>

 

thrownew SparkException(

 

"Failed to get block " + blockId + ", which is not a shuffle block")

 

}

 

}

 

}

 

}

 

通过blockManagerblockid中获取Iterator,用来得到数据

 

这里的blockManagerreduce进行shuffle的具体有两个实现,默认为BasicBlockFetcherIterator

 

如果spark.shuffle.use.netty配置为true时,实现类为NettyBlockFetcherIterator

 

BasicBlockFetcherIterator中通过nio的方式使用sparkenv中的ConnectionManager来接收数据,

 

NettyBlockFetcherIterator通过netty的通信框架进行操作,使用netty时,

 

通过reducespark.shuffle.copier.threads配置的线程数来获取数据,默认的线程个数为6.

 

valblockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)

 

取出每一个blockid中的values部分的iterator.

 

valitr = blockFetcherItr.flatMap(unpackBlock)

 

 

 

valcompletionIter = CompletionIterator[T, Iterator[T]](itr, {

 

valshuffleMetrics = new ShuffleReadMetrics

 

shuffleMetrics.shuffleFinishTime = System.currentTimeMillis

 

shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime

 

shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime

 

shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead

 

shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks

 

shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks

 

shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks

 

context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)

 

})

 

 

 

new InterruptibleIterator[T](context, completionIter)

 

}

 

 

 

通过MapOutputTracker得到shufflestagemap完成的mapstatus

 

上面得到MapStatus的容器的函数定义

 

def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {

 

检查executor本地是否有此shuffleidmapstatuses信息,

 

valstatuses = mapStatuses.get(shuffleId).orNull

 

如果本地还没有shuffle的状态数据(所有的shuffle完成的状态都需要从master中同步过来)

 

if (statuses == null) {

 

logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")

 

varfetchedStatuses: Array[MapStatus] = null

 

出于线程安全考虑,

 

fetching.synchronized {

 

如果shuffleid已经在fetching中存在,等待shufflemaster获取MapStatus完成。

 

这里主要是为了多个task同时来获取数据,第一个task已经向master发起申请,

 

第二个就不需要在发起只需要等待第一个完成申请并得到数据存储到fetchedStatuses中。

 

if (fetching.contains(shuffleId)) {

 

// Someone else is fetching it; wait for them to be done

 

while (fetching.contains(shuffleId)) {

 

try {

 

fetching.wait()

 

} catch {

 

casee: InterruptedException =>

 

}

 

}

 

}

 

 

 

if (fetchedStatuses == null) {

 

// We won the race to fetch the output locs; do so

 

logInfo("Doing the fetch; tracker actor = " + trackerActor)

 

// This try-finally prevents hangs due to timeouts:

 

try {

 

通过askTracker函数,通过actorrefMapoutputTrackerMasterActor发起GetMapOutputStatuses事件。

 

得到此stage完成的所有的taskMapStatus信息

 

valfetchedBytes =

 

askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]

 

解析成fetchedStatuses数据。

 

fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)

 

logInfo("Got the output locations")

 

添加到executor中的MapStatuses容器中。缓存起来,共下一个task实例。

 

mapStatuses.put(shuffleId, fetchedStatuses)

 

} finally {

 

master中获取数据完成,把fetching中的shuffleid移出。

 

fetching.synchronized {

 

fetching -= shuffleId

 

fetching.notifyAll()

 

}

 

}

 

}

 

if (fetchedStatuses != null) {

 

fetchedStatuses.synchronized {

 

通过指定的shuffleidreduceid的值,得到此reduceblockid中要获取数据的大小。

 

return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)

 

}

 

}

 

else {

 

thrownew FetchFailedException(null, shuffleId, -1, reduceId,

 

new Exception("Missing all output locations for shuffle " + shuffleId))

 

}

 

} else {

 

通过指定的shuffleidreduceid的值,得到此reduceblockid中要获取数据的大小。localcache模式

 

statuses.synchronized {

 

return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)

 

}

 

}

 

}

 

 

 

MapOutputTracker.convertMapStatuses函数

 

private def convertMapStatuses(

 

shuffleId: Int,

 

reduceId: Int,

 

statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {

 

assert (statuses != null)

 

statuses.map {

 

status =>

 

if (status == null) {

 

thrownew FetchFailedException(null, shuffleId, -1, reduceId,

 

new Exception("Missing an output location for shuffle " + shuffleId))

 

} else {

 

取出MapStatus中,针对此reducepartition中的shuffle的内容大小。

 

(status.location, decompressSize(status.compressedSizes(reduceId)))

 

}

 

}

 

}

 

 

 

........

 

0
0
分享到:
评论

相关推荐

    源码 spark shuffle service在中通的优化实践.docx

    【Spark Shuffle Service在中通的优化实践】 Spark Shuffle Service是Apache Spark中用于处理Stage间数据交换的关键组件。在中通快递的业务场景中,随着业务量的快速增长,传统的基于Hive+MapReduce的离线计算方式...

    Spark源码分析.pdf

    此外,Shuffle服务、网络通信库Tachyon和Akka框架也是深入理解Spark的重要部分。 本书可能涵盖了以下几个方面: 1. Spark架构:解释Spark的基本组件和它们之间的交互,以及如何构建Spark应用程序。 2. RDD原理:...

    深入理解Spark 核心思想与源码分析

    本文将深入探讨Spark的核心思想,并通过源码分析来深化理解。 一、Spark核心思想 1. **弹性分布式数据集(Resilient Distributed Datasets, RDD)**:RDD是Spark的核心数据抽象,它是一种不可变、分区的记录集合,...

    深入理解Spark核心思想与源码分析

    《深入理解Spark核心思想与源码分析》这本书是Spark技术领域的深度探索,旨在帮助读者全面了解Spark的核心原理和实现机制。Spark作为一个分布式计算框架,以其高效、易用和多模态处理能力在大数据处理领域备受青睐。...

    spark考试练习题含答案.rar

    这份资料包含两部分:《spark练习题含答案01.docx》和《spark练习题含答案02.docx》,旨在通过实践的方式提升对Spark技术的运用能力。 一、Spark基础篇 Spark的基础部分主要涉及以下几个关键概念: 1. **RDD...

    Spark大数据分析核心概念技术及实践--(美)攫罕默德·古勒

    以上仅是本书《Spark大数据分析核心概念技术及实践》中部分重要知识点的概括,实际上书中还涵盖了更多深入的技术细节和实际应用场景。通过学习这些核心概念和技术,读者将能够更好地理解和掌握Spark这一强大的大数据...

    Spark源码剖析

    这部分内容可能会分析Spark SQL的内核,包括Catalyst查询优化器和Tungsten执行引擎。 5. Spark流式处理分析:介绍Spark的微批处理模型,分析其内部的流式处理机制,包括DStream API的实现原理和流作业的调度。 6. ...

    Spark-2.3.1源码解读

    Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions那些事 高效使用mappartitions standalone模式下executor调度策略 Spark Sql源码阅读 Spark Sql源码阅读 hive on ...

    Spark Summit 2019 部分PPT

    spark summit 5月部分PPT。主要是SQL、core相关的。全部接近200个 看不完,只挑了部分感兴趣的下载回来 Analyzing 2TB of Raw Trace Data from a Manufacturing Process A First Use Case of Apache Spark for ...

    深入理解Spark+核心思想与源码分析

    在源码分析部分,读者会了解到Spark如何通过`SparkContext`初始化,如何调度任务,`Executor`如何执行任务,以及`RDD`的创建、转换和行动操作的实现细节。此外,还会深入到`Shuffle`过程、错误恢复机制、存储策略...

    spark笔记整理文档

    提交Spark作业后,Driver会将作业分解为Stage(基于shuffle划分),然后提交Task到Executor执行。Executor在内存中缓存数据,并在本地执行任务,提高整体性能。 7. Spark内存管理: Spark利用内存存储中间结果,...

    spark-timeSeries.rar_scala 时间序列_spark ARIMA_spark arima_spark 滑

    Spark时间序列分析是一种在大数据处理领域中用于挖掘和预测数据趋势的重要工具,特别是在金融、气象、电商等领域。本文将深入探讨使用Scala语言在Spark平台上实现ARIMA(自回归积分滑动平均模型)和Holt-Winters三次...

    spark HTML查看文档

    Shuffle是Spark作业中的关键步骤,它涉及到数据在不同节点间的重新分布。理解并优化Shuffle可以显著提高Spark作业的性能。例如,通过减少shuffle阶段的数据交换、使用合适的分区策略等方法,可以降低网络I/O,提高...

    带你深入理解Spark核心思想走进Sprak的源码分析

    1. 内存划分:Spark将内存分为两部分,一部分用于存储RDD缓存,另一部分用于存储中间计算结果。这种设计提高了计算效率,减少了磁盘I/O。 2. Tungsten:Spark 2.0引入了Tungsten项目,实现了更高效的内存管理和序列...

    大数据-spark

    **大数据-Spark**主要应用于大规模数据处理领域,特别是对于需要离线分析的大数据集。Spark作为一个高性能的分布式计算框架,提供了强大的工具来处理海量数据,使得数据分析更加高效。 **Spark**的核心组件是**弹性...

    Spark 快速大数据分析

    容错性意味着即使部分数据丢失,Spark也能通过重新计算丢失的部分来恢复。弹性则允许RDD在集群的不同节点间动态移动,以适应资源变化。 **3. Spark 应用开发** Spark 提供了多种编程接口,包括Scala、Java、Python...

    spark笔记.zip

    10. 容错与故障恢复:Spark的容错机制主要依赖于数据分区和血统信息,当Executor失败时,可以从源RDD重新计算丢失的部分。 以上知识点是"Spark笔记"中可能涵盖的内容,Hive.docx可能详细介绍了如何使用Hive进行数据...

    Spark技术内幕深入解析Spark内核架构设计与实现原理.pdf

    10. **Spark Shuffle**:Shuffle是Spark中数据重新分布的过程,对于性能影响较大。优化Shuffle过程可以显著提升Spark作业的效率。 11. **内存管理**:Spark的内存管理包括存储内存和执行内存两部分,通过Tungsten...

    spark之sparkStreaming 理解

    此外,Spark Streaming还能与其他Spark模块(如Spark SQL、MLlib、GraphX等)紧密结合,提供更丰富的数据分析能力。 ### Spark Streaming工作原理 #### 四、工作流程 1. **实时数据接收**:Spark Streaming接收...

Global site tag (gtag.js) - Google Analytics