`

[spark-src-core] 2.3 shuffle in spark

 
阅读更多

1.flow

1.1 shuffle abstract



 

 

  1.2 shuffle flow

 

   1.3 sort flow  in shuffle

 

   1.4 data structure in mem



 

2.core code paths

 

//SortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = { //-how to collect this result by partition?by index file
    //-1 sort result data
    //-both below cases will spill if over threshold
    val ts = System.currentTimeMillis()
    //-comp to reduce side combine HashShuffleReader#read(),here is real map side.
    if (dep.mapSideCombine) { //-as map side's Combiner;note:even if no aggregrator is provided,DAGScheduler will add it-
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") //by default,but keyOrdering,eg.
      sorter = new ExternalSorter[K, V, C](   //reduceByKey()
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      logInfo("-aggr:" + dep.aggregator + ",key ord:" + dep.keyOrdering
        +",ser:" + dep.serializer +",part:"+dep.partitioner + ",dep " + dep)

      sorter.insertAll(records) //-if no order is given ,using the key's hashcode to sort per partition
    } else { //-no combine is given,eg. groupBy..
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the *reduce side*
      // if the operation being run is *sortByKey.*
      sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }
    logInfo("*total cost of sorting(ms) " + (System.currentTimeMillis()-ts))
    ///-2 write to data file then index file
    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)//-same with shulffle output file in insertAll()
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)

    //-means that always write result dato disk event if data is much less.
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    logInfo("-output file:" + outputFile + ",blockid:" + blockId
      + ",part len " + partitionLengths.length + ",total " + partitionLengths.sum
      +",shuffle server id " + blockManager.shuffleServerId + ",shuffleId " + dep.shuffleId)

    //-3 encapsulate the result(serialization is placed in Executor#launchTask())
    //-used by MapOutputTracker#getServerStatuses()
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)//-not contains real data but only shuffle server
  }

  /** Close this writer, passing along whether the map completed */
  override def stop(success: Boolean): Option[MapStatus] = {
    try {
      if (stopping) {
        return None
      }
      stopping = true
      if (success) {
        return Option(mapStatus)
      } else {
        // The map task failed, so delete our output data.
        shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
        return None
      }
    } finally {
      // Clean up our sorter, which may have its own intermediate files
      if (sorter != null) {
        val startTime = System.nanoTime()
        sorter.stop()
        context.taskMetrics.shuffleWriteMetrics.foreach(
          _.incShuffleWriteTime(System.nanoTime - startTime))
        sorter = null
      }
    }
  }
 
//ExternalSorter
/**-this file's data structure is same as spilled file.
   * Write all the data added into this ExternalSorter into *a file* in the disk store. This is
   * called by the SortShuffleWriter and can go through an efficient path of just concatenating
   * binary files if we decided to avoid merge-sorting.
   *
   * @param blockId block ID to write to. The index file will be blockId.name + ".index".-note
   * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
   */
  def writePartitionedFile(
      blockId: BlockId,
      context: TaskContext,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    //-1.case
    if (bypassMergeSort && partitionWriters != null) {
      logInfo("-bypass:" + bypassMergeSort+",pwriters:" + partitionWriters.length)
      // We decided to write separate files for each partition, so just concatenate them. To keep
      // this simple we spill out the current in-memory collection so that everything is in files.-so no order is guaranteed
      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
      partitionWriters.foreach(_.commitAndClose())
      val out = new FileOutputStream(outputFile, true) //-note here,append is 'true'
      val writeStartTime = System.nanoTime
      util.Utils.tryWithSafeFinally {
        for (i <- 0 until numPartitions) {
          val in = new FileInputStream(partitionWriters(i).fileSegment().file)
          util.Utils.tryWithSafeFinally {
            lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
          } {
            in.close()
          }
        }
      } {
        out.close()
        context.taskMetrics.shuffleWriteMetrics.foreach(
          _.incShuffleWriteTime(System.nanoTime - writeStartTime))
      }
    } else if (spills.isEmpty && partitionWriters == null) {//-come here is if no shuffle spill data to disk;same as spill()
      logInfo("-no spills occured")
      //2 Case where we only have in-memory data
      val collection = if (aggregator.isDefined) map else buffer
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator) //-same as spillToMergableFile()
      while (it.hasNext) { //-note:this is a double loops
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics.shuffleWriteMetrics.get) //-using file's append mode,so only one file will be created
        val partitionId = it.nextPartition() //-base part id
        while (it.hasNext && it.nextPartition() == partitionId) { //-since the data has been ordered by partId,so
          it.writeNext(writer)      //-this is a staged writing
        }
        writer.commitAndClose() //-a part data is write all out
        val segment = writer.fileSegment()
        lengths(partitionId) = segment.length //-count up it's size
      }
    } else { //-3.case spilled files and remain data in-mem(here will spill them all out and concate them to final file)
      logInfo("-merge spilled file and in-mem data?,part it:" + this.partitionedIterator)
      // Not bypassing merge-sort; get an iterator by partition and just write everything directly.
      for ((id, elements) <- this.partitionedIterator) { //-use multiway merge sorter
        if (elements.hasNext) {
          val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
            context.taskMetrics.shuffleWriteMetrics.get)
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          writer.commitAndClose()
          val segment = writer.fileSegment()
          lengths(id) = segment.length
        }
      }
    }

    context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
    context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
    context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
      if (curWriteMetrics != null) {
        m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
        m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
        m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten)
      }
    }

    lengths
  }
 

 

3.FAQ(TBD TODO)

   here are some questions im not clear,so any clues from you are highly appreciated:

A.when and where does ResultTask know to fetch result data from ShuffleMapTask?for example,here are a job with : 

  3 maps(m1,m2,m3) x 2 reduces(r1,r2)

t1:m1,m2,m3 are all running ;

t2:all maps are continued running except that m1 is finished,then  r1 is notified to setup and fetchs result of m1

t3:m2 is finished.question is here:when does r1 know to fetch the result of m2? in my guess,there shuould be a share place to for reduces to know where are maps and when they are finished,but i have not found that sources with happy.

 

B.in BlockManager,see #Question# below

private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
    require(blockId != null, "BlockId is null")
    //-1 get bock locations replication-note:this block is plaaced in TaskRunner#run() in case of 'Indirect result'
    val locations = Random.shuffle(master.getLocations(blockId)) //-deliver to BlockManagerMasterEndpoint
    //- #Question# why not to identify which blockmanager is most recent to this driver if in cluster even local mode?vip
    for (loc <- locations) { //-multi hosts for the same blockid,so once is enough if data is valid
      logDebug(s"Getting remote block $blockId from $loc")
      //-2 fetch real data
      val data = blockTransferService.fetchBlockSync(
        loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()

      if (data != null) { //-get one from one of the replication node,return immediately
        if (asBlockResult) {
          return Some(new BlockResult(
            dataDeserialize(blockId, data),
            DataReadMethod.Network,
            data.limit()))
        } else {
          return Some(data)
        }
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }

 

 

 

 

 

 

  • 大小: 332.1 KB
  • 大小: 193.6 KB
  • 大小: 34.7 KB
  • 大小: 151.1 KB
分享到:
评论

相关推荐

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    Spark-3.1.2.tgz和Spark-3.1.2-bin-hadoop2.7.tgz是两个不同格式的Spark发行版,分别以tar.gz和rar压缩格式提供。 1. Spark核心概念: - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合...

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark-1.6.0-bin-hadoop2.6.tgz 是针对Linux系统的Spark安装包,包含了Spark 1.6.0版本以及与Hadoop 2.6版本兼容的构建。这个安装包为在Linux环境中搭建Spark集群提供了必要的组件和库。 **1. Spark基础知识** ...

    spark-3.0.0-bin-hadoop2.7.tgz

    Spark-3.0.0-bin-hadoop2.7.tgz 是Spark 3.0.0版本的预编译二进制包,其中包含了针对Hadoop 2.7版本的兼容性构建。这个版本的发布对于数据科学家和大数据工程师来说至关重要,因为它提供了许多性能优化和新功能。 1...

    spark-2.4.7-bin-hadoop2.7.tgz

    Spark 2.4.7是Apache Spark的一个稳定版本,它为大数据处理提供了高效、易用且可扩展的框架...一旦下载并解压"spark-2.4.7-bin-hadoop2.7.tgz",就可以开始设置环境、配置参数,然后根据业务需求编写和运行Spark应用。

    spark-2.4.0-bin-without-hadoop.tgz

    "spark-2.4.0-bin-without-hadoop" 这个压缩包文件是专门为安装 Spark 2.4.0 而准备的,但需要注意的是,它不包含 Hadoop 相关依赖,这意味着你需要自己提供 Hadoop 配置或者在不依赖 Hadoop 的环境中使用。...

    spark-2.4.4-bin-hadoop2.6.tgz

    这个压缩包文件"spark-2.4.4-bin-hadoop2.6.tgz"包含了运行Spark所需的所有组件和依赖,以便在Hadoop 2.6环境中运行。 1. **Spark核心概念**: Spark的核心是弹性分布式数据集(Resilient Distributed Datasets, ...

    spark-2.1.0-bin-without-hadoop.tgz

    这个压缩包"spark-2.1.0-bin-without-hadoop.tgz"是Spark的二进制发行版,但不包含Hadoop依赖,这意味着用户需要自行配置Hadoop环境来使用Spark。 1. **Spark架构**:Spark的核心设计是弹性分布式数据集(Resilient...

    spark1.6.0-src.rar

    这个压缩包"spark1.6.0-src.rar"包含了Spark 1.6.0的源代码,为开发者提供了深入理解Spark工作原理的机会,同时也允许用户根据自身需求进行定制化开发。 源码分析: 1. **Spark Core**:Spark的核心组件,负责任务...

    spark-2.4.0源码

    Spark是Apache软件基金会下的一个开源...通过对Spark-2.4.0源码的阅读和研究,开发者可以了解到分布式系统设计、内存管理、任务调度、数据并行处理等多方面的知识,这对于提升大数据处理技术的专业水平有着极大的帮助。

    spark-2.3.1-bin-hadoop2.9-without-hive.tgz

    在描述中提到的"spark-2.3.1-bin-hadoop2.9-without-hive.tgz"是一个特别构建的Spark发行版,不包含Hive的支持,意味着这个版本的Spark没有内置与Hive交互的能力。 在大数据处理领域,Spark以其内存计算特性而闻名...

    spark-2.1.1-bin-hadoop2.7.tar.gz

    本压缩包"spark-2.1.1-bin-hadoop2.7.tar.gz"是Spark 2.1.1与Hadoop 2.7兼容的二进制发行版,适用于那些已经部署或打算使用Hadoop 2.7环境的用户。 首先,我们来了解一下Spark的核心特性。Spark主要以Resilient ...

    Spark-Core学习知识笔记整理

    Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...

    spark-1.3.1-bin-hadoop2.6.tgz

    Spark-1.3.1-bin-hadoop2.6.tgz是一个针对Linux和Windows系统的安装包,包含了Apache Spark 1.3.1版本以及与Hadoop 2.6兼容的依赖。这个压缩包为用户提供了在本地或集群环境中搭建Spark计算平台的基础。 1. **Spark...

    spark--bin-hadoop2-without-hive.tgz

    解压"spark--bin-hadoop2-without-hive.tgz"后,需要配置环境变量,如SPARK_HOME和PATH,然后可以通过spark-shell、pyspark或spark-submit命令启动Spark交互式环境或提交应用程序。 7. **优化技巧**: - **内存...

    Spark-2.4.5官网下载源码包

    Spark-2.4.5是该框架的一个稳定版本,提供了丰富的数据处理功能,包括批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)以及机器学习(通过MLlib库)和图计算(通过GraphX)。这个版本的源码...

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions...

    spark-2.3.1-bin-hadoop2.6.tgz

    Spark 2.3.1是Apache Spark的一个稳定版本,它是一个快速、通用且可扩展的大数据处理框架。...通过下载并解压"spark-2.3.1-bin-hadoop2.6.tgz",你可以开始搭建本地或集群的Spark环境,探索这个框架的更多可能性。

    spark-2.0.0.tgz

    Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性而...如果你在官方下载渠道遇到问题,可以尝试从其他可靠的来源获取“spark-2.0.0.tgz”压缩包,以加快获取速度并开始你的Spark之旅。

    my-spark-core:扩展spark-core

    5. **网络通信改进**:Spark的shuffle过程涉及大量的网络通信,"my-spark-core"可能通过优化网络传输,减少了数据交换的成本,提升了整体性能。 6. **API扩展与定制**:项目可能提供了更方便的API,允许开发者更...

    spark-3.3.0-bin-hadoop3.3.2.tgz

    本压缩包“spark-3.3.0-bin-hadoop3.3.2.tgz”包含了在Hadoop 3.3.2环境下运行Spark所需的所有组件和配置文件。 1. **Spark核心概念** Spark的核心是弹性分布式数据集(Resilient Distributed Datasets, RDD),这...

Global site tag (gtag.js) - Google Analytics