`
wbj0110
  • 浏览: 1611228 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark源码系列(六)Shuffle的过程解析

阅读更多

Spark大会上,所有的演讲嘉宾都认为shuffle是最影响性能的地方,但是又无可奈何。之前去百度面试hadoop的时候,也被问到了这个问题,直接回答了不知道。

这篇文章主要是沿着下面几个问题来开展:

1、shuffle过程的划分?

2、shuffle的中间结果如何存储?

3、shuffle的数据如何拉取过来?

Shuffle过程的划分

Spark的操作模型是基于RDD的,当调用RDD的reduceByKey、groupByKey等类似的操作的时候,就需要有shuffle了。再拿出reduceByKey这个来讲。

  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

reduceByKey的时候,我们可以手动设定reduce的个数,如果不指定的话,就可能不受控制了。

复制代码
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism)
    } else { new HashPartitioner(bySize.head.partitions.size)
    }
  }
复制代码

如果不指定reduce个数的话,就按默认的走:

1、如果自定义了分区函数partitioner的话,就按你的分区函数来走。

2、如果没有定义,那么如果设置了spark.default.parallelism,就使用哈希的分区方式,reduce个数就是设置的这个值。

3、如果这个也没设置,那就按照输入数据的分片的数量来设定。如果是hadoop的输入数据的话,这个就多了。。。大家可要小心啊。

设定完之后,它会做三件事情,也就是之前讲的3次RDD转换。

 View Code

1、在第一个MapPartitionsRDD这里先做一次map端的聚合操作。

2、ShuffledRDD主要是做从这个抓取数据的工作。

3、第二个MapPartitionsRDD把抓取过来的数据再次进行聚合操作。

4、步骤1和步骤3都会涉及到spill的过程。

怎么做的聚合操作,回去看RDD那章。

Shuffle的中间结果如何存储

作业提交的时候,DAGScheduler会把Shuffle的过程切分成map和reduce两个Stage(之前一直被我叫做shuffle前和shuffle后),具体的切分的位置在上图的虚线处。

map端的任务会作为一个ShuffleMapTask提交,最后在TaskRunner里面调用了它的runTask方法。

复制代码
  override def runTask(context: TaskContext): MapStatus = {
    val numOutputSplits = dep.partitioner.numPartitions
    metrics = Some(context.taskMetrics)

    val blockManager = SparkEnv.get.blockManager
    val shuffleBlockManager = blockManager.shuffleBlockManager var shuffle: ShuffleWriterGroup = null var success = false try { // serializer为空的情况调用默认的JavaSerializer,也可以通过spark.serializer来设置成别的 val ser = Serializer.getSerializer(dep.serializer) // 实例化Writer,Writer的数量=numOutputSplits=前面我们说的那个reduce的数量 shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser) // 遍历rdd的元素,按照key计算出来它所在的bucketId,然后通过bucketId找到相应的Writer写入 for (elem <- rdd.iterator(split, context)) {
        val pair = elem.asInstanceOf[Product2[Any, Any]]
        val bucketId = dep.partitioner.getPartition(pair._1)
        shuffle.writers(bucketId).write(pair)
      } // 提交写入操作. 计算每个bucket block的大小 var totalBytes = 0L var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit()
        writer.close()
        val size = writer.fileSegment().length
        totalBytes += size
        totalTime += writer.timeWriting()
        MapOutputTracker.compressSize(size)
      } // 更新 shuffle 监控参数. val shuffleMetrics = new ShuffleWriteMetrics
      shuffleMetrics.shuffleBytesWritten = totalBytes
      shuffleMetrics.shuffleWriteTime = totalTime
      metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

      success = true new MapStatus(blockManager.blockManagerId, compressedSizes)
    } catch { case e: Exception => // 出错了,取消之前的操作,关闭writer if (shuffle != null && shuffle.writers != null) { for (writer <- shuffle.writers) {
          writer.revertPartialWrites()
          writer.close()
        }
      } throw e
    } finally { // 关闭writer if (shuffle != null && shuffle.writers != null) { try {
          shuffle.releaseWriters(success)
        } catch { case e: Exception => logError("Failed to release shuffle writers", e)
        }
      } // 执行注册的回调函数,一般是做清理工作  context.executeOnCompleteCallbacks()
    }
  }
复制代码

遍历每一个记录,通过它的key来确定它的bucketId,再通过这个bucket的writer写入数据。

下面我们看看ShuffleBlockManager的forMapTask方法吧。

复制代码
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) // 从已有的文件组里选文件,一个bucket一个文件,即要发送到同一个reduce的数据写入到同一个文件  blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
        }
      } else {
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => // 按照blockId来生成文件,文件数为map数*reduce数 val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId) if (blockFile.exists) { if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
        }
      }
复制代码

1、map的中间结果是写入到本地硬盘的,而不是内存。

2、默认是一个Executor的中间结果文件是M*R(M=map数量,R=reduce的数量),设置了spark.shuffle.consolidateFiles为true之后是R个文件,根据bucketId把要分到同一个reduce的结果写入到一个文件中。

3、consolidateFiles采用的是一个reduce一个文件,它还记录了每个map的写入起始位置,所以查找的时候先通过reduceId查找到哪个文件,再通过mapId查找索引当中的起始位置offset,长度length=(mapId + 1).offset -(mapId).offset,这样就可以确定一个FileSegment(file, offset, length)。

4、Finally,存储结束之后, 返回了一个new MapStatus(blockManager.blockManagerId, compressedSizes),把blockManagerId和block的大小都一起返回。

个人想法,shuffle这块和hadoop的机制差别不大,tez这样的引擎会赶上spark的速度呢?还是让我们拭目以待吧!

Shuffle的数据如何拉取过来

ShuffleMapTask结束之后,最后走到DAGScheduler的handleTaskCompletion方法当中(关于中间的过程,请看《图解作业生命周期》)。

 View Code

1、把结果添加到Stage的outputLocs数组里,它是按照数据的分区Id来存储映射关系的partitionId->MapStaus。

2、stage结束之后,通过mapOutputTracker的registerMapOutputs方法,把此次shuffle的结果outputLocs记录到mapOutputTracker里面。

这个stage结束之后,就到ShuffleRDD运行了,我们看一下它的compute函数。

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)

它是通过ShuffleFetch的fetch方法来抓取的,具体实现在BlockStoreShuffleFetcher里面。

 View Code

1、MapOutputTrackerWorker向MapOutputTrackerMaster获取shuffle相关的map结果信息。

2、把map结果信息构造成BlockManagerId --> Array(BlockId, size)的映射关系。

3、通过BlockManager的getMultiple批量拉取block。

4、返回一个可遍历的Iterator接口,并更新相关的监控参数。

我们继续看getMultiple方法。

复制代码
 def getMultiple(
      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
      serializer: Serializer): BlockFetcherIterator = {
    val iter = if (conf.getBoolean("spark.shuffle.use.netty", false)) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
      } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
      }

    iter.initialize()
    iter
  }
复制代码

分两种情况处理,分别是netty的和Basic的,Basic的就不讲了,就是通过ConnectionManager去指定的BlockManager那里获取数据,上一章刚好说了。

我们讲一下Netty的吧,这个是需要设置的才能启用的,不知道性能会不会好一些呢?

看NettyBlockFetcherIterator的initialize方法,再看BasicBlockFetcherIterator的initialize方法,发现Basic的不能同时抓取超过48Mb的数据。

复制代码
    override def initialize() { // 分开本地请求和远程请求,返回远程的FetchRequest val remoteRequests = splitLocalRemoteBlocks() // 抓取顺序随机 for (request <- Utils.randomize(remoteRequests)) {
        fetchRequestsSync.put(request)
      } // 默认是开6个线程去进行抓取 copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// 读取本地的block  getLocalBlocks()
   }
复制代码

在NettyBlockFetcherIterator的sendRequest方法里面,发现它是通过ShuffleCopier来试下的。

  val cpier = new ShuffleCopier(blockManager.conf)
   cpier.getBlocks(cmId, req.blocks, putResult)

这块接下来就是netty的客户端调用的方法了,我对这个不了解。在服务端的处理是在DiskBlockManager内部启动了一个ShuffleSender的服务,最终的业务处理逻辑是在FileServerHandler。

它是通过getBlockLocation返回一个FileSegment,下面这段代码是ShuffleBlockManager的getBlockLocation方法。

复制代码
  def getBlockLocation(id: ShuffleBlockId): FileSegment = { // Search all file groups associated with this shuffle. val shuffleState = shuffleStates(id.shuffleId) for (fileGroup <- shuffleState.allFileGroups) {
      val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) if (segment.isDefined) { return segment.get }
    } throw new IllegalStateException("Failed to find shuffle block: " + id)
  }
复制代码

先通过shuffleId找到ShuffleState,再通过reduceId找到文件,最后通过mapId确定它的文件分片的位置。但是这里有个疑问了,如果启用了consolidateFiles,一个reduce的所需数据都在一个文件里,是不是就可以把整个文件一起返回呢,而不是通过N个map来多次读取?还是害怕一次发送一个大文件容易失败?这就不得而知了。

到这里整个过程就讲完了。可以看得出来Shuffle这块还是做了一些优化的,但是这些参数并没有启用,有需要的朋友可以自己启用一下试试效果。

http://www.luobo360.com/article/140

分享到:
评论

相关推荐

    java毕设项目之ssm基于SSM的高校共享单车管理系统的设计与实现+vue(完整前后端+说明文档+mysql+lw).zip

    项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    YOLO算法-贴纸检测数据集-212张图像带标签-部分覆盖-未涵盖-完全覆盖.zip

    YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;

    zigbee CC2530无线自组网协议栈系统代码实现协调器按键控制终端LED灯和继电器动作.zip

    1、嵌入式物联网单片机项目开发例程,简单、方便、好用,节省开发时间。 2、代码使用IAR软件开发,当前在CC2530上运行,如果是其他型号芯片,请自行移植。 3、软件下载时,请注意接上硬件,并确认烧录器连接正常。 4、有偿指导v:wulianjishu666; 5、如果接入其他传感器,请查看账号发布的其他资料。 6、单片机与模块的接线,在代码当中均有定义,请自行对照。 7、若硬件有差异,请根据自身情况调整代码,程序仅供参考学习。 8、代码有注释说明,请耐心阅读。 9、例程具有一定专业性,非专业人士请谨慎操作。

    手语图像分类数据集【已标注,约2,500张数据】

    手语图像分类数据集【已标注,约2,500张数据】 分类个数【36】:0、1、a、b等【具体查看json文件】 划分了训练集、测试集。存放各自的同一类数据图片。如果想可视化数据集,可以运行资源中的show脚本。 CNN分类网络改进:https://blog.csdn.net/qq_44886601/category_12858320.html 【更多图像分类、图像分割(医学)、目标检测(yolo)的项目以及相应网络的改进,可以参考本人主页:https://blog.csdn.net/qq_44886601/category_12803200.html】

    CNCAP 2024打分表

    CNCAP 2024打分表

    基于小程序的智慧校园管理系统源代码(java+小程序+mysql+LW).zip

    系统可以提供信息显示和相应服务,其管理智慧校园管理系统信息,查看智慧校园管理系统信息,管理智慧校园管理系统。 项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 部署容器:tomcat7 小程序开发工具:hbuildx/微信开发者工具

    【图像去噪】基于matlab PolSAR GWLS滤波器图像去噪【含Matlab源码 9937期】.zip

    Matlab领域上传的视频均有对应的完整代码,皆可运行,亲测可用,适合小白; 1、代码压缩包内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、仿真咨询 如需其他服务,可私信博主; 4.1 博客或资源的完整代码提供 4.2 期刊或参考文献复现 4.3 Matlab程序定制 4.4 科研合作

    影音互动科普网站-JAVA-基于SpringBoot的哈利波特书影音互动科普网站设计与实现(毕业论文)

    影音互动科普网站功能描述 影音互动科普网站旨在通过多媒体形式(视频、音频、互动内容等)传播科学知识,提高公众的科学素养。该网站结合娱乐与教育,提供易于理解的科普内容,吸引不同年龄层次的用户参与和学习。以下是该网站的主要功能描述: 1. 用户注册与登录 用户注册:用户可以通过电子邮箱、手机号或社交账号(如微信、微博等)注册,提供基本信息并设置密码。 用户登录:支持通过注册的账号登录,保障个人信息的安全性,并提供自动登录功能。 2. 科普视频与音频库 视频内容:网站提供各类科普视频,包括短视频、纪录片、讲座、实验演示等,覆盖物理、化学、生物、地理、天文等多个领域。 音频内容:提供科普音频节目,如科普广播、播客、专题讲座等,便于用户在日常生活中进行学习。 视频分类:按科目、难度、年龄层、时长等维度对视频和音频进行分类,帮助用户更精准地找到感兴趣的内容。 字幕与多语言支持:提供字幕、翻译和多语种版本,帮助不同语言的用户学习。 3. 互动问答与讨论区 专家问答:用户可以向科普专家提问,专家提供详尽的解答,解决用户的科学疑惑。 社区讨论:用户可以在视频下方或专题页面中发表评论、提问或与其他用户

    倪海厦讲义及笔记,易学数据测算

    倪海厦讲义及笔记,易学数据测算

    【组合数学答案】组合数学-苏大李凡长版-课后习题答案

    内容概要:本文档是《组合数学答案-网络流传版.pdf》的内容,主要包含了排列组合的基础知识以及一些经典的组合数学题目。这些题目涵盖了从排列数计算、二项式定理的应用到容斥原理的实际应用等方面。通过对这些题目的解析,帮助读者加深对组合数学概念和技巧的理解。 适用人群:适合初学者和有一定基础的学习者。 使用场景及目标:可以在学习组合数学课程时作为练习题参考,也可以在复习考试或准备竞赛时使用,目的是提高解决组合数学问题的能力。 其他说明:文档中的题目覆盖了组合数学的基本知识点,适合逐步深入学习。每个题目都有详细的解答步骤,有助于读者掌握解题思路和方法。

    管理系统开发指南:功能要求、技术栈及安全控制

    内容概要:本文是一篇完整的管理系统开发指南,详细介绍了功能要求、技术栈选择、数据库设计、用户界面搭建以及安全控制等方面的内容。功能要求包括用户管理、权限控制、数据管理、系统日志、通知与消息、统计分析和扩展模块。使用的技术栈涵盖了后端(Java、Python、C#等)和前端(React、Vue.js、Angular等)技术,以及数据库设计和安全控制措施。 适合人群:具备一定开发经验的软件工程师和技术管理人员。 使用场景及目标:适用于企业级管理系统开发项目,旨在构建一个高效、安全且易于扩展的系统。开发者可以参考本文档进行系统的设计和实现,确保系统满足业务需求。 其他说明:本文档提供了详细的步骤和最佳实践,帮助开发者更好地理解和应用管理系统开发的各种技术。通过结合实际案例和实践经验,本文档能够为开发者提供有价值的指导。

    听器听力损伤程度分级表.docx

    听器听力损伤程度分级表.docx

    MATLAB代码:基于条件风险价值的合作型Stackerlberg博弈微网动态定价与优化调度 关键词:微网优化调度 条件风险价值 合作博弈 纳什谈判 参考文档:A cooperative Stack

    MATLAB代码:基于条件风险价值的合作型Stackerlberg博弈微网动态定价与优化调度 关键词:微网优化调度 条件风险价值 合作博弈 纳什谈判 参考文档:《A cooperative Stackelberg game based energy management considering price discrimination and risk assessment》完美复现 仿真平台:MATLAB yalmip+cplex+mosek 主要内容:代码主要做的是一个基于合作型Stackerlberg博弈的考虑差别定价和风险管理的微网动态定价与调度策略,提出了一个双层能源管理框架,实现多个微网间的P2P能源交易,上层为零商的动态定价模型,目标是社会福利最大化;下层是多个产消者的合作博弈模型,优化各产消者的能量管理策略。 同时,采用纳什谈判法对多个产消者的合作剩余进行公平分配,还考虑了运行风险,采用条件风险价值(CVaR)随机规划方法来描述零商的预期损失。 求解方面,双层模型被基于KKT条件转为单层模型,模型可以高效求解。 这段代码是一个基于合作型Stackelberg博弈的微网

    YOLO算法-监控数据集-873张图像带标签-警方-警车-救护车-消防车-跌倒的人-消防员.zip

    YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;

    (175526236)【动漫网页设计】源码免费分享,让你的网站更有趣!

    20块钱买的【动漫网页设计】源码,免费分享出来啦,如果要积分那是系统自动涨的啦。 内容概要:本资源是一份动漫网页设计的源码,价格仅为20元,作者将其免费分享给大家。该源码包含了动漫元素的设计,包括背景、图标、按钮等,同时也提供了一些常见的网页布局和交互效果。通过该资源,可以学习到动漫网页设计的基本原理和技巧。 适用人群:本资源适用于对动漫网页设计感兴趣的人群,包括网页设计师、UI设计师、前端开发工程师等。同时,对于想要学习动漫网页设计的初学者也非常适用。 使用场景及目标:该资源可以用于学习和实践动漫网页设计的技巧和原理。通过学习该源码,可以了解到动漫网页设计的基本要素和设计思路,同时也可以借鉴其中的设计元素和交互效果,应用到自己的网页设计中。 其他说明:本资源是作者自己设计的,经过了多次修改和优化,具有一定的参考价值。同时,作者也将其价格设置的非常低,希望更多的人可以学习到动漫网页设计的技巧和方法。如果您对该资源有任何疑问或建议,欢迎在评论区留言,作者会尽快回复。。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。

    C++程序设计-参考答案

    自考 本科 C++程序设计-课本 参考答案

    每周质量安全排查报告.docx

    每周质量安全排查报告.docx

    YOLO算法-杂草检测项目数据集-3970张图像带标签-杂草.zip

    YOLO算法-杂草检测项目数据集-3970张图像带标签-杂草.zip

    内存搜索工具(易).rar

    内存搜索工具(易).rar

    2024 AGM Meritech Market Section (External).pdf

    AI大模型研究相关报告

Global site tag (gtag.js) - Google Analytics