阅读更多

1顶
0踩

数据库
区别常见的Embarrassingly Parallel系统,类似MapReduce和Apache Spark(Apache Hadoop的下一代数据处理引擎)这样的计算引擎主要区别在于对“all-to-all” 操作的支持上。和许多分布式引擎一样,MapReduce和Spark的操作通常针对的是被分片数据集的子分片,很多操作每次只处理单个数据节点,同时这些操作所涉及到的数据往往都只存在于这个数据片内。all-to-all操作必须将数据集看作一个整体,而每个输出结果都可以总结自不同分片上的记录。Spark的groupByKey、sortByKey,还有reduceByKey这些shuffle功能都属于这方面常见的操作。

在这些分布式计算引擎中,shuffle指的是在一个all-to-all操作中将数据再分割和聚合的操作。显而易见,在实践生产中,我们在Spark部署时所发现的大多性能、可扩展性及稳定性问题都是在shuffle过程中产生的。

Cloudera和英特尔的工程师们正通力合作以扩展Spark的shuffle,使得shuffle可以更加快速与稳定地处理大量的数据集。Spark在很多方面相较MapReduce有更多优势,同时又在稳定性与可扩展性上相差无几。在此,我们从久经考验的MapReduce shuffle部署中吸取经验,以提高排序数据输出的shuffle性能。

在本文中,我们将会逐层解析——介绍目前Spark shuffle的运作实现模式,提出修改建议,并对性能的提高方式进行分析。更多的工作进展可以于正在进行中的SPARK-2926发现。

Spark目前的运作实现模式

一个shuffle包含两组任务:1. 产生shuffle数据的阶段;2.使用shuffle数据的阶段。鉴于历史原因,写入数据的任务被称做“map task”,而读取数据的任务被称做“reduce tasks”,但是以上角色分配只局限于单个job的某个具体shuffle过程中。在一个shuffle中扮演reduce的task,在另一个shuffle中可能就是map了,因为它在前者里面执行的是读取操作,而在后者中执行的是数据写入任务,并在随后的阶段中被消费。

MapReduce和Spark的shuffle都使用到了“pull”模式。在每个map任务中,数据被写入本地磁盘,然后在reduce任务中会远程请求读取这些数据。由于shuffle使用的是all-to-all模式,任何map任务输出的记录组都可能用于任意reduce。一个job在map时的shuffle操作基于以下原则:所有用于同一个reduce操作的结果都会被写入到相邻的组别中,以便获取数据时更为简单。

Spark默认的shuffle实现(即hash-based shuffle)是map阶段为每个reduce任务单独打开一个文件,这种操作胜在简单,但实际中却有一些问题,比如说实现时Spark必须维持大量的内存消耗,或者造成大量的随机磁盘I/O。此外,如果M和R分别代表着一个shuffle操作中的map和reduce数量,则hash-based shuffle需要产生总共M*R个数量的临时文件,Shuffle consolidation将这个数量减至C*R个(这里的C代表的是同时能够运行的map任务数量),但即便是经过这样的修改之后,在运行的reducer数量过多时还是经常会出现“文件打开过多”的限制。






Hash-based shuffle中单个map任务




Sort-based shuffle中单个map任务

为了进一步提高shuffle的稳定性与性能,从1.1版本开始,Spark引入了“sort-based shuffle”实现,其功能与MapReduce使用的map方式十分类似。在部署时,每个任务的map输出结果都会被储存在内存里(直到可用内存耗尽),然后在reduce任务中进行排序,之后再spill到一个单独的文件。如果在单个任务中该操作发生了多次,那么这个任务的输出将被合并。

在reduced的过程中,一组线程负责抓取远程的map输出blocks。当数据进入后,它们会被反序列化,再转化成一个适用于执行all-to-all操作的数据结构。在类似groupByKey、reduceByKey,还有aggregateByKey之类的聚合操作中,其结果会变成一个ExternalAppendOnlyMap(本质上是一个内存溢出时会spill到硬盘的哈希map)。在类似sortByKey的排序操作中,输出结果会变成一个ExternalSorter(将结果分类后可能会spill到硬盘,并在对结果进行排序后返回一个迭代程序)。

完全Sort-based Shuffle

上文所描述的方式有两个弊端:

  • 每个Spark reduce的任务都需要同时打开大量的反序列化记录,从而导致内存的大量消耗,而大量的Java对象对JVM的垃圾收集(garbage collection)产生压力,会造成系统变慢和卡顿,同时由于这个版本较之序列化的版本内存消耗更为巨大,因而Spark必须更早更频繁的spill,造成硬盘I/O也更为频繁。此外,由于判断反序列化对象的内存占用情况时难以达到100%的准确率,因此保持大量的反序列化对象会加剧内存不足的可能性。
  • 在引导需要在分片内的排序操作时,我们需要进行两次排序:mapper时按分片排序,reducer时按Key排序。

我们修改了map时在分片内按Key对结果进行排序,这样在reduce时我们只要合并每个map任务排序后的吧blocks即可。我们可以按照序列化的模式将每个block存到内存中,然后在合并时逐一地将结果反序列化。这样任何时候,内存中反序列化记录的最大数量就是已经合并的blocks总量。



完全sort-based shuffle中的单个map任务

单个reduce任务可以接收来自数以千计map任务的blocks,为了使得这个多路归并更加高效,尤其是在数据超过可用内存的情况下,我们引入了分层合并( tiered merger)的概念。如果需要合并许多保存在磁盘上的blocks,这样做可以最小化磁盘寻道数量。分层合并同样适用于ExternalAppendOnlyMap以及ExternalSorter的内部合并步骤,但是暂时我们还没有进行修改。

高性能合并

每个任务中有一组线程是负责同步抓取shuffle数据的,每个任务对应的内存池有48MB,用来存放相应的数据。

我们引入了SortShuffleReader,先从内存池中获取到blocks,然后[key, value]的方式向用户代码中返回迭代器对象。

Spark有一个所有任务共享的shuffle内存区域,默认大小是完整executor heap的20%。当blocks进入时,SortShuffleReader会尝试从该主区域中调用shuffle所需的内存,直至内存塞满调用失败为止,然后我们需要将数据spill到硬盘上以释放内存。SortShuffleReader将所有(好吧,并非所有的,有时候只会spill一小部分)内存中的数据块写入一个单独的文件中并存入硬盘。随着blocks被存入硬盘,一个后台线程会对其进行监视,并在必要时将这些文件合并为更大一些的磁盘blogs。“final merge”会将所有最终硬盘与内存中的blocks全部合并起来。

如何确定是时候进行一个临时的“磁盘到磁盘”合并?

spark.shuffle.maxMergeFactor(默认为100)控制着一次可以合并的硬盘blocks数量的最大值,当硬盘blocks的数量超过限制时,后台线程会运行一次合并以降低这个数量(但是不会马上奏效,详情请查看代码)。在确定需要合并多少blocks时,线程首先会将需要执行合并的blocks数量设定为最小值,并将这个值作为合并数量的上限,以期尽可能减少blocks的合并次数。因此,如果spark.shuffle.maxMergeFactor是100,而磁盘blocks的最终数量为110,这样只需总共进行11个blocks的合并,就可将最终磁盘blocks的数量保持在恰好100。想要再合并哪怕一个blocks,都会需要再一次的额外合并,而可能导致不必要的磁盘I/O。



maxMergeWidth为4的分层合并。每个矩形代表一个segment,其中三个合并为一个,然后最终有四个segment被合并到一个迭代器中,以备下一次操作使用。

与sortByKey的性能对比

我们测试了使用SparkPerf进行sortbykey时,在相应的修改后,性能有何变化。在其中我们选择了两个不同大小的数据集,以比较我们的改动在内存足以支持所有shuffle数据时,和不足以支持的情况下对于性能的增益情况。

Spark的sortByKey变化导致两个job和三个stage。

  • Sample stage:进行数据取样以创建一个分区范围,分区大小相等。
  • Map阶段:写入为reduce阶段准备的shuffle bucket。
  • Reduce阶段:得到相关的shuffle结果,按特定的数据集分区进行合并/分类。

引入一个6节点集群的基准,每个executor包含24个core和36GB的内存,大数据集有200亿条记录,压缩后在HDFS上占409.8GB。小数据集有20亿条记录,压缩后在HDFS上占15.9GB。每条记录都包含一对10个字符串的键值对,在两个case中,我们在超过1000个分片中测试了排序,每个stage的运行时间表以及总共的job如下图显示:



大数据集(越低则越好)




小数据集(越低则越好)

取样阶段耗时相同,因为此阶段并不涉及shuffle过程;在map阶段,在我们的改进下,每个分片中按Key对数据进行排序,导致这个阶段的运行时间增加了(大数据集增加了37%,小数据集则是27%)。但是增加的时间在reduce阶段得到了更大的补偿,由于现在只需合并排序后的数据,Reduce阶段的两个数据集的耗时共减少了66%,从而使得大数据集加速27%,小数据集加速17%。

下面还有什么?

SPARK-2926是Spark shuffle的几个改进计划的成果之一,在这个版本中很多方面上shuffle可以更好地管理内存:
  • SPARK-4550 用内存缓冲中的map输出数据作为原始数据,取代Java对象。map输出数据的空间消耗更少,从而使得spill更少,在原始数据的对比上更快。
  • SPARK-4452 更详细地追踪不同shuffle数据结构的内存分配,同时将无需消耗的内存尽早返还。
  • SPARK-3461 追踪agroupBy后出现的特定Key值相应字符串或者节点,而不是一次将其全部loading入内存。

作者简介:Sandy Ryza是Cloudera公司的数据科学家、Hadoop提交者,同时也是Spark的贡献者之一。他还是Advanced Analytics with Spark一书的作者之一。

Saisai(Jerry)Shao是一名英特尔公司的软件工程师,同时也是Spark的贡献者之一。
英文原文:Improving Sort Performance in Apache Spark: It’s a Double
  • 大小: 67.9 KB
  • 大小: 67.9 KB
  • 大小: 72.9 KB
  • 大小: 71.8 KB
  • 大小: 28 KB
  • 大小: 43.8 KB
  • 大小: 40.4 KB
1
0
评论 共 1 条 请登录后发表评论
1 楼 yueyunyue 2015-01-27 10:05
沙发。。。。。。。

发表评论

您还没有登录,请您登录后再发表评论

相关推荐

  • 双倍提升ApacheSpark排序性能

    区别常见的Embarrassingly Parallel系统,类似MapReduce和Apache Spark(Apache Hadoop的下一代数据处理引擎)这样的计算引擎主要区别在于对“all-to-all” 操作的支持上。和许多分布式引擎一样,MapReduce和...

  • 【Spark】Dataset与DataFrame的使用

    是因为他作于于列,生成的的对象是无类型的 * 建议:尽可能多的使用groupBy */ import org.apache.spark.sql.functions._ ds.groupBy('name).agg(mean("age")).show() } 结果展示 思考:为什么groupByKey是有类型的...

  • 湖仓一体技术调研(Apache Hudi、Iceberg和Delta lake对比)

    湖仓一体技术调研(Apache Hudi、Iceberg和Delta lake对比) 作者:程哥哥、刘某迎 、杜某安、刘某、施某宇、严某程 1 引 言 ​ 随着当前的大数据技术逐步革新,企业对单一的数据湖和数仓架构并不满意。越来越多的企业...

  • 第25课 Spark Hash Shuffle源码解读与剖析

    第25课:14 Spark Hash Shuffle源码解读与剖析Spark 2.1x 现在的版本已经没有Hash Shuffle的方式,那为什么我们还要讲解HashShuffle源码的内容呢?原因有3点:1,在现在的实际生产环境下,很多人在用Spark1.5.x,...

  • Spark SQL 快速入门系列(七)Dataset (DataFrame) 的基础操作

    其实就是差集 val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.except(ds2).show() intersect 求得两个集合的交集 val ds1 = spark.range(1, 10) val ds2 = spark.range(5, 15) ds1.intersect(ds2)...

  • 第33课:彻底解密Spark 2.1.X中Shuffle 中Mapper端的源码实现

    第33课:彻底解密Spark 2.1.X中Shuffle 中Mapper端的源码实现本文根据家林大神系列课程编写 http://weibo.com/ilovepainsSpark是MapReduce思想的实现之一,在一个作业中,会把不同的计算按照不同的依赖关系分成不同...

  • Spark(三)-- SparkSQL扩展(数据操作) -- Column(二)

    import org.apache.spark.sql.functions._ val column2: sql.Column = col("name") (4)column 方法描述: 帮助我们创建 Column 对象 //4. column 必须导入functions val column3: sql.Column = ...

  • 20180807 - Spark快速大数据分析

    Spark快速大数据分析 概念 数据的两个方向: 数据科学:分析+建模(回答业务问题、挖掘潜在规律、辅助产品推荐) 数据处理:硬件(内存、集群)+软件(封装、接口、监控、优化) 框架 应用层: Spark...

  • Hive向SparkSQL迁移总结

    Hive & SparkSQL使用不同点 ... 在shell中提交hive -e 和spark-sql -e,spark-sql需要用""显式的把字符串引起来 spark-sql -e 执行时转义符号需要修改为[],而不可以使用// SparkSQL优化(Spark2...

  • Flink

    一般来说,Spark基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。而在海量数据的批处理领域,Spark能够处理的吞吐量更大,加上其完善的生态和成熟易用的...

  • DLink 流批一体技术架构及优势 | 滴普科技FastData系列解读

    传统的基于离线(比如 Hive)数仓有很高的成熟度和稳定性,但在一些时延要求比较高的场景,则需要借助实时数仓 Flink 的帮助,将延时降低到秒级(或分钟级),但两套并存的数仓架构,势必带来双倍的资源消耗和开发...

  • 大数据学习之Flink,10分钟带你初步了解Flink

    高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。...可以连接到最常用的存储系统,如 Apache Kafka、Apache Cassandra、Elasticsearch、 JDBC、Kinesis 和(分布式)文件系统,如 HDFS 和 S3。高可用。

  • 分布式系统

    垂直缩放只能将性能提升至最新的硬件功能。这些能力证明对于工作量适中到较大的技术公司是 不够 的。 关于水平缩放的最好的事情是,您无限制地扩展规模 - 只要性能下降,您只需添加另一台机器,最多可达到无限大...

  • Flink-介绍和快速上手

    是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和管道方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的...

  • 5.大数据生态圈成员和原理

    Lambda 架构整合离线计算和实时计算,融合不可变性(Immunability),读写分离和复杂性隔离等一系列架构原则,可集成Hadoop,Kafka,Storm,Spark,Hbase等各类大数据组件。 ...

  • flink笔记1(初识 Flink)

    (1) Flink 的核心特性 (2)分层 API 5、Flink vs Spark (1)数据处理架构 (2)数据模型和运行架构 (3)Spark 还是 Flink? 一、初识 Flink 1、概念 在 Flink 官网主页的顶部可以看到,项目的核心目标,是...

  • 第一课 大数据技术之Fink1.13的实战学习-部署使用和基础概念

    1.6 Flink 的特性总结 1.7 Flink与 Spark的比较 第二节 Flink 快速上手 2.1 环境准备 2.2 编写代码逻辑-批处理 2.3 流处理-文件数据 2.4 流处理-读取socket文本流 第三节 Flink 部署 3.1 快速启动一个 Flink 集群 ...

  • go 生成基于 graphql 服务器库.zip

    格奇尔根 首页 > 文件 > gqlgen是什么?gqlgen是一个 Go 库,用于轻松构建 GraphQL 服务器。gqlgen 基于 Schema 优先方法— 您可以使用 GraphQL Schema 定义语言来定义您的 API 。gqlgen 优先考虑类型安全— 您永远不应该看到map[string]interface{}这里。gqlgen 启用 Codegen — 我们生成无聊的部分,以便您可以专注于快速构建您的应用程序。还不太确定如何使用gqlgen?将gqlgen与其他 Go graphql实现进行比较快速启动初始化一个新的 go 模块mkdir examplecd examplego mod init example添加github.com/99designs/gqlgen到项目的 tools.goprintf '//go:build tools\npackage tools\nimport (_ "github.com/99designs/gqlgen"\n _ "github.com/99designs/gqlgen

  • 基于JAVA+SpringBoot+Vue+MySQL的社区物资交易互助平台 源码+数据库+论文(高分毕业设计).zip

    项目已获导师指导并通过的高分毕业设计项目,可作为课程设计和期末大作业,下载即用无需修改,项目完整确保可以运行。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。 项目都经过严格调试,确保可以运行!可以放心下载 技术组成 语言:java 开发环境:idea 数据库:MySql8.0 部署环境:maven 数据库工具:navicat

Global site tag (gtag.js) - Google Analytics