阅读更多

0顶
0踩

互联网

原创新闻 浅谈Spark应用程序的性能调优

2016-01-19 10:51 by 副主编 mengyidan1988 评论(2) 有10863人浏览
Spark是基于内存的分布式计算引擎,以处理的高效和稳定著称。然而在实际的应用开发过程中,开发者还是会遇到种种问题,其中一大类就是和性能相关。在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能。

分布式计算引擎在调优方面有四个主要关注方向,分别是CPU、内存、网络开销和I/O,其具体调优目标如下:
1.提高CPU利用率。
2.避免OOM。
3.降低网络开销。
4.减少I/O操作。
第1章 数据倾斜
数据倾斜意味着某一个或某几个Partition中的数据量特别的大,这意味着完成针对这几个Partition的计算需要耗费相当长的时间。

如果大量数据集中到某一个Partition,那么这个Partition在计算的时候就会成为瓶颈。图1是Spark应用程序执行并发的示意图,在Spark中,同一个应用程序的不同Stage是串行执行的,而同一Stage中的不同Task可以并发执行,Task数目由Partition数来决定,如果某一个Partition的数据量特别大,则相应的task完成时间会特别长,由此导致接下来的Stage无法开始,整个Job完成的时间就会非常长。

要避免数据倾斜的出现,一种方法就是选择合适的key,或者是自己定义相关的partitioner。在Spark中Block使用了ByteBuffer来存储数据,而ByteBuffer能够存储的最大数据量不超过2GB。如果某一个key有大量的数据,那么在调用cache或persist函数时就会碰到spark-1476这个异常。

下面列出的这些API会导致Shuffle操作,是数据倾斜可能发生的关键点所在
1. groupByKey
2. reduceByKey
3. aggregateByKey
4. sortByKey
5. join
6. cogroup
7. cartesian
8. coalesce
9. repartition
10. repartitionAndSortWithinPartitions



图1: Spark任务并发模型

 def rdd: RDD[T]
}

// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class DemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
  rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
  // Here we use a single Long to try to ensure the sort is balanced, 
  // but for really large dataset, we may want to consider
  // using a tuple of many Longs or even a GUID
  def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
    rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
    .grouped(numPartitions).map(t => (t._1._1, t._2))
}

case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
  def grouped(size: Int): RDD[T] = {
    // TODO Version where withIndex is cached
    val withIndex = rdd.mapPartitions(_.zipWithIndex)

    val startValues =
      withIndex.mapPartitionsWithIndex((i, iter) => 
        Iterator((i, iter.toIterable.last))).toArray().toList
      .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)

    withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
      case (value, index) => (startValues(i) + index.toLong, value)
    })
    .partitionBy(new Partitioner {
      def numPartitions: Int = size
      def getPartition(key: Any): Int = 
        (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
    })
    .map(_._2)
  }
}

定义隐式的转换
implicit def toDemoRDD[T: ClassManifest](rdd: RDD[T]): DemoRDD[T] = 
    new DemoRDD[T](rdd)
  implicit def toDemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
    rdd: RDD[(K, V)]): DemoPairRDD[K, V] = DemoPairRDD(rdd)
  implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}

在spark-shell中就可以使用了
import RDDConversions._
yourRdd.grouped(5)

第2章 减少网络通信开销
Spark的Shuffle过程非常消耗资源,Shuffle过程意味着在相应的计算节点,要先将计算结果存储到磁盘,后续的Stage需要将上一个Stage的结果再次读入。数据的写入和读取意味着Disk I/O操作,与内存操作相比,Disk I/O操作是非常低效的。

使用iostat来查看disk i/o的使用情况,disk i/o操作频繁一般会伴随着cpu load很高。

如果数据和计算节点都在同一台机器上,那么可以避免网络开销,否则还要加上相应的网络开销。 使用iftop来查看网络带宽使用情况,看哪几个节点之间有大量的网络传输。
图2是Spark节点间数据传输的示意图,Spark Task的计算函数是通过Akka通道由Driver发送到Executor上,而Shuffle的数据则是通过Netty网络接口来实现。由于Akka通道中参数spark.akka.framesize决定了能够传输消息的最大值,所以应该避免在Spark Task中引入超大的局部变量。



图2: Spark节点间的数据传输

第1节 选择合适的并发数
为了提高Spark应用程序的效率,尽可能的提升CPU的利用率。并发数应该是可用CPU物理核数的两倍。在这里,并发数过低,CPU得不到充分的利用,并发数过大,由于spark是每一个task都要分发到计算结点,所以任务启动的开销会上升。

并发数的修改,通过配置参数来改变spark.default.parallelism,如果是sql的话,可能通过修改spark.sql.shuffle.partitions来修改。

第1项 Repartition vs. Coalesce

repartition和coalesce都能实现数据分区的动态调整,但需要注意的是repartition会导致shuffle操作,而coalesce不会。

第2节 reduceByKey vs. groupBy

groupBy操作应该尽可能的避免,第一是有可能造成大量的网络开销,第二是可能导致OOM。以WordCount为例来演示reduceByKey和groupBy的差异。
reduceByKey
    sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).reduceByKey(_ + _)




图3:reduceByKey的Shuffle过程

Shuffle过程如图2所示
groupByKey
    sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).groupByKey.map(r=>(r._1,r._2.sum))




图4:groupByKey的Shuffle过程


建议: 尽可能使用reduceByKey, aggregateByKey, foldByKey和combineByKey
假设有一RDD如下所示,求每个key的均值
val data = sc.parallelize( List((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )

方法一:reduceByKey
data.map(r=>(r._1, (r.2,1))).reduceByKey((a,b)=>(a._1 + b._1, a._2 + b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)

方法二:combineByKey
data.combineByKey(value=>(value,1),
    (x:(Double, Int), value:Double)=> (x._1+value, x._2 + 1),
    (x:(Double,Int), y:(Double, Int))=>(x._1 + y._1, x._2 + y._2))

第3节 BroadcastHashJoin vs. ShuffleHashJoin

在Join过程中,经常会遇到大表和小表的join. 为了提高效率可以使用BroadcastHashJoin, 预先将小表的内容广播到各个Executor, 这样将避免针对小表的Shuffle过程,从而极大的提高运行效率。

其实BroadCastHashJoin核心就是利用了BroadCast函数,如果理解清楚broadcast的优点,就能比较好的明白BroadcastHashJoin的优势所在。

以下是一个简单使用broadcast的示例程序。
val lst = 1 to 100 toList
val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2)
val broadcastLst = sc.broadcast(lst)
exampleRDD.filter(i=>broadcastLst.valuecontains(i)).collect.foreach(println)

第4节 map vs. mapPartitions

有时需要将计算结果存储到外部数据库,势必会建立到外部数据库的连接。应该尽可能的让更多的元素共享同一个数据连接而不是每一个元素的处理时都去建立数据库连接。
在这种情况下,mapPartitions和foreachPartitons将比map操作高效的多。

第5节 数据就地读取

移动计算的开销远远低于移动数据的开销。

Spark中每个Task都需要相应的输入数据,因此输入数据的位置对于Task的性能变得很重要。按照数据获取的速度来区分,由快到慢分别是:

1.PROCESS_LOCAL
2.NODE_LOCAL
3.RACK_LOCAL

Spark在Task执行的时候会尽优先考虑最快的数据获取方式,如果想尽可能的在更多的机器上启动Task,那么可以通过调低spark.locality.wait的值来实现, 默认值是3s。

除了HDFS,Spark能够支持的数据源越来越多,如Cassandra, HBase,MongoDB等知名的NoSQL数据库,随着Elasticsearch的日渐兴起,spark和elasticsearch组合起来提供高速的查询解决方案也成为一种有益的尝试。

上述提到的外部数据源面临的一个相同问题就是如何让spark快速读取其中的数据, 尽可能的将计算结点和数据结点部署在一起是达到该目标的基本方法,比如在部署Hadoop集群的时候,可以将HDFS的DataNode和Spark Worker共享一台机器。

以cassandra为例,如果Spark的部署和Cassandra的机器有部分重叠,那么在读取Cassandra中数据的时候,通过调低spark.locality.wait就可以在没有部署Cassandra的机器上启动Spark Task。

对于Cassandra, 可以在部署Cassandra的机器上部署Spark Worker,需要注意的是Cassandra的compaction操作会极大的消耗CPU,因此在为Spark Worker配置CPU核数时,需要将这些因素综合在一起进行考虑。

这一部分的代码逻辑可以参考源码TaskSetManager::addPendingTask
private def addPendingTask(index: Int, readding: Boolean = false) {
  // Utility method that adds `index` to a list only if readding=false or it's not already there
  def addTo(list: ArrayBuffer[Int]) {
    if (!readding || !list.contains(index)) {
      list += index
    }
  }

  for (loc <- tasks(index).preferredLocations) {
    loc match {
      case e: ExecutorCacheTaskLocation =>
        addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
      case e: HDFSCacheTaskLocation => {
        val exe = sched.getExecutorsAliveOnHost(loc.host)
        exe match {
          case Some(set) => {
            for (e <- set) {
              addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
            }
            logInfo(s"Pending task $index has a cached location at ${e.host} " +
              ", where there are executors " + set.mkString(","))
          }
          case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
              ", but there are no executors alive there.")
        }
      }
      case _ => Unit
    }
    addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
    for (rack <- sched.getRackForHost(loc.host)) {
      addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
    }
  }

  if (tasks(index).preferredLocations == Nil) {
    addTo(pendingTasksWithNoPrefs)
  }

  if (!readding) {
    allPendingTasks += index  // No point scanning this whole list to find the old task there
  }
}

如果准备让spark支持新的存储源,进而开发相应的RDD,与位置相关的部分就是自定义getPreferredLocations函数,以elasticsearch-hadoop中的EsRDD为例,其代码实现如下。
override def getPreferredLocations(split: Partition): Seq[String] = {
  val esSplit = split.asInstanceOf[EsPartition]
  val ip = esSplit.esPartition.nodeIp
  if (ip != null) Seq(ip) else Nil
}

第6节 序列化

使用好的序列化算法能够提高运行速度,同时能够减少内存的使用。

Spark在Shuffle的时候要将数据先存储到磁盘中,存储的内容是经过序列化的。序列化的过程牵涉到两大基本考虑的因素,一是序列化的速度,二是序列化后内容所占用的大小。

kryoSerializer与默认的javaSerializer相比,在序列化速度和序列化结果的大小方面都具有极大的优势。所以建议在应用程序配置中使用KryoSerializer.
spark.serializer  org.apache.spark.serializer.KryoSerializer

默认的cache没有对缓存的对象进行序列化,使用的StorageLevel是MEMORY_ONLY,这意味着要占用比较大的内存。可以通过指定persist中的参数来对缓存内容进行序列化。
exampleRDD.persist(MEMORY_ONLY_SER)

需要特别指出的是persist函数是等到job执行的时候才会将数据缓存起来,属于延迟执行; 而unpersist函数则是立即执行,缓存会被立即清除。
  • 大小: 25.1 KB
  • 大小: 7.4 KB
  • 大小: 50 KB
  • 大小: 47.1 KB
0
0
评论 共 2 条 请登录后发表评论
1 楼 随WW便 2016-01-20 18:45
good!

发表评论

您还没有登录,请您登录后再发表评论

相关推荐

  • 在VB.net中为DATAGRID控件增加一个删除按钮的一种方法

    当你在开发ASP.NET Web应用程序时,会遇到这样的情况: 你需要给一个ASPX页面呈现的HTML添加客户端脚本代码。 有两种不同的方法可以让你在运行时添加这个代码。第一种方法就是运用Page控件的RegisterClientScriptBlock方法, 它可以让你在运行时给一个ASPX页面添加代码。该方法有两个参数——key和script。 参数key是唯一的,可以识别你添加到页面的每个sc

  • 为DataGrid 加上删除确认对话框

    private void DataGrid1_ItemDataBound(object sender, System.Web.UI.WebControls.DataGridItemEventArgs e)  {   if(e.Item.ItemType==ListItemType.Item || e.Item.ItemType==ListItemType.AlternatingItem)   { ...

  • 在DataGrid控件中弹出删除确认框

    此时要用到DataGrid控件的ItemDataBound事件了,该事件表示每绑定完一行记录,就会触发相应的事件过程。在其中就可以想办法找到删除按钮,并给它添加JavaScript事件 e.Item.ItemType表示当前行的类型,DataGrid控件一般有标题行(ListItemT

  • 浅谈 Spark 应用程序的性能调优

    在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能。 分布式计算引擎在调优方面有四个主要关注方向,分别是CPU、内存、网络开销和I/O,其具体调优目标如下: 提高CPU利用率。 避免O...

  • 浅谈Spark应用程序的性能调优(转)

    在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能。 分布式计算引擎在调优方面有四个主要关注方向,分别是CPU、内存、网络开销和I/O,其具体调优目标如下: 1.提高CPU利用率。 2.避免OOM。 3.降低...

  • 浅谈spark性能调优

    浅谈Spark应用程序的性能调优 2016-01-19 10:51by 副主编mengyidan1988评论(2)有5574人浏览 Sparkcassandra 声明:ITeye资讯文章的版权属于ITeye网站所有,严禁任何网站转载本文,否则必将追究法律责任! Spark...

  • 浅谈Spark-内存管理

    spark内存管理

  • Hadoop性能调优、YARN的内存和CPU配置

    转 Hadoop性能调优、YARN的内存和CPU配置 2018年06月12日 21:01:54 toto1297488504 阅读数:2417 ...

  • 浅谈机器学习生命周期平台MLflow

    MLflow 提供了一组轻量级 API,可用于任何现有的机器学习应用程序或库(TensorFlow、PyTorch、XGBoost 等),无论您当前在何处运行 ML 代码(例如:在笔记本电脑、独立应用程序或云平台中)。 机器学习工作流程 机器...

  • 运维角度浅谈MySQL数据库优化

    这篇博文主要谈MySQL数据库发展周期中所面临的问题及优化方案,暂且抛开前端应用不说,大致分为以下五个阶段:1、数据库表设计 项目立项后,开发部根据产品部需求开发项目,开发工程师工作其中一部分就是对表结构...

  • 浅谈 Apache Doris FE 处理查询 SQL 源码解析

    在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE ...

  • 【转载】运维角度浅谈MySQL数据库优化

    运维角度浅谈MySQL数据库优化 2015-06-02 14:22:02 标签:mysql优化 mysql分库分表分区 mysql读...

  • 分布式计算浅谈

    读者至少应该掌握:1 分布式计算不仅仅是一种方式。 2 当前可用分布式至少有三种框架:分治框架、流框架、流水线框架。3 应用范围: 工业互联网络、云计算、物联网、大数据、并行计算、人工智能。.........

  • COMSOL模拟碳酸钙岩石与盐酸反应的随机孔隙酸化路径及布林克曼流动形成的分形结构

    内容概要:本文详细介绍了利用COMSOL软件模拟碳酸钙(CaCO3)在岩石中与盐酸(HCl)反应过程中产生的随机孔隙酸化路径及其形成的布林克曼流动。首先,通过蒙特卡洛方法生成随机孔隙分布,模拟真实岩石内部复杂的孔隙结构。接着,采用布林克曼方程处理多孔介质中的粘性力和渗透流动,并引入化学反应模块,模拟CaCO3与HCl之间的化学反应。随着模拟的进行,酸液流动路径逐渐形成类似雪花状的分形结构,展示了流动与溶解之间的动态博弈。最后,通过自适应网格技术和粒子追踪功能,精确捕捉并可视化这些精美的分形图案。 适合人群:从事地质工程、材料科学、化学工程等领域研究的专业人士,以及对多孔介质传输现象感兴趣的科研工作者。 使用场景及目标:适用于研究多孔介质内的化学反应和流体流动特性,特别是对于优化石油开采中的酸化压裂工艺具有重要指导意义。 其他说明:文中提供了详细的MATLAB和COMSOL代码片段,帮助读者理解和重现模拟过程。此外,强调了随机性和确定性在微观尺度上的相互作用,揭示了自然界深层次的规律。

  • 基于滑模控制的永磁同步电机直接转矩控制仿真建模与实现

    内容概要:本文详细介绍了将滑模控制(SMC)应用于永磁同步电机(PMSM)直接转矩控制(DTC)的技术细节。首先解释了转矩和磁链误差计算方法,接着探讨了滑模面的设计及其对系统抖振的影响。文中还提供了扇区矢量选择的具体实现方式,并深入讨论了磁链观测器的改进措施。此外,文章分析了滑模控制器的设计要点以及仿真过程中需要注意的关键参数配置。通过对比传统PI控制,验证了滑模控制在提高系统鲁棒性和快速响应方面的优势。 适合人群:从事电机控制系统研究的专业人士,尤其是对永磁同步电机直接转矩控制感兴趣的科研工作者和技术人员。 使用场景及目标:适用于希望深入了解并掌握滑模控制理论及其在PMSM-DTC应用中的具体实现方法的研究人员。目标是在实际项目中能够运用滑模控制提升系统的稳定性和性能。 其他说明:文中提供的MATLAB/Simulink代码片段有助于读者更好地理解和复现实验结果。同时提醒读者关注一些常见的陷阱,如参数选择不当可能导致的问题。

  • 北京大学网络安全工作人员管理规定:涵盖人员职责、聘用、转岗离岗、教育培训及第三方管理

    内容概要:本文详细介绍了北京大学针对网络安全工作人员的管理规定,旨在加强网络安全管理和明确不同角色的责任。全文分为九章,涵盖了网络安全工作人员及其职责、聘用管理、转岗和离岗管理、教育培训、第三方人员管理及奖惩措施等方面的内容。重点在于明确各级单位和人员的具体职责,确保网络安全制度的有效执行,并强调了对第三方人员的严格管控和保密要求。 适合人群:适用于高校网络安全管理人员及相关技术人员,尤其是北京大学及其下属单位的网络安全工作者。 使用场景及目标:①帮助高校建立健全网络安全管理体系;②指导网络安全工作人员明确自身职责,提高工作效率;③规范第三方人员的访问和操作,降低安全风险。 其他说明:本文还提供了多个附件,如网络安全承诺书、访问申请表和保密协议模板,便于实际操作和管理。

  • 网络设备市场现状与发展趋势分析(2024-2030年)-技术革新与智能化应用

    内容概要:本文深入探讨了中国网络设备市场的现状及其未来发展潜力。首先介绍了网络设备的基本概念及其作为现代通信网络基础设施的重要地位,随后分析了当前市场面临的挑战和技术进步带来的机遇。文中特别强调了5G、物联网、云计算等新兴技术对网络设备性能和安全性的更高要求,以及由此催生的高带宽、低延迟产品的市场需求。此外,还讨论了软件定义网络(SDN)、网络功能虚拟化(NFV)、边缘计算等新技术的应用前景,指出未来网络设备将更加智能化、自动化,并能更好地支持AI和ML技术。最后,通过对多家领先企业的案例研究,展示了行业内竞争态势及各公司在技术创新方面的努力。 适用人群:从事网络设备相关领域的研究人员、工程师、管理人员,以及关注该领域发展的投资者。 使用场景及目标:帮助读者了解网络设备行业的最新动态和技术趋势,为制定战略决策提供依据;同时为企业和个人投资者提供市场洞察,辅助其做出合理的投资选择。 其他说明:报告基于详实的数据分析和专家意见撰写而成,旨在为专业人士提供有价值的参考资料。

Global site tag (gtag.js) - Google Analytics