初学者刚开始写spark程序的时候,往往只注重实现相应的功能,而容易忽略采用何种实现方式能够实现最高的效率。本文后面讲详细阐述作者在实际项目中遇到的spark程序调优问题。
1. 下面这段代码的背景是这样的,panelFeatureMid1类型为RDD[(String, (scala.collection.mutable.HashMap[String,Double], (Option[String], Option[String])))],表示一个UUID(可以理解为cookie id)上出现的SPID(可以理解为网页上某个监测点号)及其频次的HashMap统计结果,后面的两个Option[String]中一个表示的是该UUID的人群属性信息(例如性别,年龄,教育程度,收入),另一个可以忽略。
现在需要以SPID及其频次为特征,训练人群属性的分类器。这就需要对SPID进行编号,下面的一段程序就是实现这个功能。
//step2: broadcast all unique spid and index them
val spidSet = panelFeatureMid1.mapPartitions(tp => {
val spidSet = tp.foldLeft(HashSet.empty[String])((s, elem) => s.union(elem._2._1.keySet))
spidSet.map(s => (s, 1)).toIterator
}).reduceByKey((a, b) => 1).map(tp => tp._1).collect
println("spidSet num:" + spidSet.size)
val indexSet = sc.broadcast(spidSet)
val panelFeatureMid = panelFeatureMid1.mapPartitions(tp =>
{var i = -1;
val indexMap = indexSet.value.foldLeft(HashMap.empty[String, Int])((s, elem) => {i = i+1; s.put(elem, i); s});
val out = tp.map(s => (s._1, (s._2._2, s._2._1.map(spid => (indexMap.get(spid._1).get, spid._2) ))))
out.toIterator
}).cache
个人经验:a. 在后面多个task中将使用到的RDD,需要调用cache函数保存在内存中
b. 对于规模不大,而又需要全局使用的数据集,可以作为广播变量broadcast出去
c. 在涉及需要全局数据参与map操作过程时,尽量使用mapPartitions
2. 在spark程序中,需要特别留意的是需要进行IO shuffle的操作,因为shuffle操作将导致RDD数据的网络IO,非常耗时。而其中join操作(包括left,right,full各种join)尤其令初学者容易陷入耗时的shuffle操作中,不能自拔,而对其产生畏惧。
下面的这段代码的背景是这样的:panel和cookieMapping都是RDD[(String, String)]类型,第一个String表示的是UUID(同上),第二个String分别表示人群属性和cookie mapping属性,需要将同一个UUID的人群属性和cookie mapping属性连接到一起,即对它们做full join。
val panel = getPanel(sc, panelDir + "/l" + month).repartition(numPartitions)
val cookieMapping = getCookieMapping(sc, new StringBuilder(cookieMappingDir).append("/").append(month).append("/*").toString)
.repartition(numPartitions)
//step 2: panel full outer join cookie mapping data, repartition and cache
val total = cookieMapping.fullOuterJoin(panel).cache
个人经验:a. 在进行join操作前,尽量确保参与join操作的两个RDD的分区数量相同,这样可以避免无谓的shuffle操作,同时在groupByKey和reduceByKey等操作中,也提供了分区数量参数,在这里设置分区数量,可以省略额外的repartition操作,如下例:
val uuids = sc.broadcast(total.map(tp => tp._1).collect.toSet)
println("uuids number = " + uuids.value.size)
println("driver after collect freeMem = " + Runtime.getRuntime().freeMemory() + " totalMem = " + Runtime.getRuntime().totalMemory())
val uuidSpid = sc.newAPIHadoopFile(new StringBuilder(monitorEtlDir).append("/").append(month).append("*/campaign*").toString,
classOf[MzSequenceFileInputFormat], classOf[LongWritable], classOf[Text])
.mapPartitions(tp => {
val uuidSet = uuids.value
val freeMem = Runtime.getRuntime().freeMemory()
val totalMem = Runtime.getRuntime().totalMemory()
println("map freeMem = " + freeMem + " totalMem = " + totalMem)
tp.flatMap(ts => {val items = ts._2.toString().split("\\^").map(s => {val kv = s.split("=")
(kv(0), kv(1))}).toMap
val uuid = items("uuid");
if(uuidSet.contains(uuid)) Iterator((uuid, items.get("p")))
else Iterator()})
}).groupByKey(numPartitions)
.map(tp => { val spidPv = tp._2.foldLeft(HashMap.empty[String, Double])(
(pv, p) => {val spid = p.get;
if(spid != null){
if(pv.contains(spid))
pv.put(spid, pv.get(spid).get+1)
else pv.put(spid, 1)}
pv })
(tp._1, spidPv) })
val panelFeatureMid1 = uuidSpid.join(total)
由于map,mapPartitions等操作不改变分区数量,所以这里可以确保做join时的两个RDD分区数量相同
相关推荐
本文将深入探讨Spark性能调优的关键技术和方法,特别是在资源分配、并行度调节等方面的具体实践策略,并针对数据倾斜这一常见问题提供全面的解决方案。 #### 二、资源分配优化 资源的有效分配是Spark性能调优的第一...
### Spark性能调优详解 #### 一、引言 随着大数据技术的发展,Apache Spark作为一款通用的大数据分析引擎,因其高效的数据处理能力而受到广泛青睐。然而,在实际应用中,为了充分发挥Spark的优势,对其进行合理的...
### Spark性能优化指南 #### 一、基础篇:开发调优与资源调优 ##### 1. 开发调优 **1.1 调优概述** ...通过对这些关键点的深入理解和有效实践,可以大幅提升Spark作业的执行效率,更好地满足大数据处理的需求。
为了更好地理解Spark的性能调优,首先需要明确一些基本概念: 1. **Worker与Executor:** 每一台主机(host)上可以运行多个worker进程,而每个worker进程下又可以启动多个executor线程。这些executor线程负责执行...
虽然文中未给出具体案例,但通常情况下,高级性能调优会涉及实际应用场景下的实践,比如在电商领域进行用户行为分析、金融领域的风险评估等。通过对特定场景的深入理解,结合上述调优技巧,可以有效提升Spark SQL的...
本文将深入探讨Java在大数据性能调优中的重要性、关键技术和实践策略。 首先,Java作为广泛应用于大数据处理的语言,其性能对整个大数据系统的效率有着直接影响。Java的跨平台特性使得它成为大数据框架如Hadoop、...
### Spark调优在Facebook的实践 #### Apache Spark在Facebook的应用 Apache Spark因其高效的数据处理能力,在Facebook这样的大型科技公司中扮演着重要的角色。Facebook利用Spark处理每天成千上万的任务,涉及数百...
通过以上配置和调优步骤,可以显著提升Hive on Spark的性能,实现更快的数据处理速度。在实践中还需要结合具体应用场景,不断调整优化策略,以达到最佳效果。此外,定期监控集群资源使用情况、调整配置参数,也是...
性能调优是Spark应用的关键环节,包括调整executor数量、内存分配、shuffle行为、数据序列化策略等。合理设置这些参数能显著提升Spark应用的效率。 **9. Spark的监控和日志管理** Spark提供了Web UI来监控应用程序...
- Spark性能调优策略:包括内存管理、持久化级别选择、广播变量使用、并行度设置、数据倾斜处理等。 - Spark容错机制:如RDD的不变性和分区概念,使得在节点失败时可以通过重新计算恢复数据。 此外,还可能涉及到...
与此同时,它也兼容批处理和流式处理,对于程序吞吐量和延迟都有较高要求,因此GC参数的调优在Spark应用实践中显得尤为重要。Spark是时下非常热门的大数据计算框架,以其卓越的性能优势、独特的架构、易用的用户接口...
7. **Spark性能调优**:书中会讲解如何优化Spark应用程序,包括配置参数、内存管理、数据序列化和缓存策略等。 8. **Spark部署**:Spark可以运行在多种模式下,如本地模式、standalone模式、Mesos、YARN或...
一般来说,监控和测量涉及到跟踪Spark应用的性能和资源使用情况,这可以帮助开发者和运维人员优化应用配置和性能调优。 最后,文档中还提到了“Linking with Spark”,可能是指通过API或库将Spark与其他系统或应用...
总的来说,Spark程序的示例测试程序为开发者提供了一个实践平台,通过Java API深入学习Spark的各个方面,包括数据处理、分布式计算和性能调优。通过不断地试验和调试,开发者可以更好地掌握Spark这一强大工具,提升...
总的来说,通过阅读《Spark大数据处理:技术、应用与性能优化》一书,读者将能掌握Spark的核心技术,了解如何在大数据项目中有效应用Spark,并学会如何对Spark应用进行性能调优,从而提升大数据处理的效率和效果。
14. **开发环境与工具**:介绍如何设置开发环境,使用Scala、Python或Java编写Spark程序,以及使用IDE如IntelliJ IDEA和Eclipse进行开发。 15. **调试与监控**:讲解如何使用Spark的监控工具如Spark UI和Grafana...
SparkContext是Spark应用程序的入口点,负责创建和管理RDD。Spark Job是由一系列Stage组成的,每个Stage包含多个Task,这些Task在集群中并行执行。 二、Spark架构设计 Spark采用主从架构,由Driver Program和Worker...
6. **Spark性能优化**:介绍如何调整Spark配置参数,如executor数量、内存分配、shuffle操作的优化等,以提高计算效率。 7. **案例研究**:通过具体的业务场景,如日志分析、用户行为分析、推荐系统等,展示Spark...
此外,还会讲解如何在不同环境中部署Spark,例如YARN、Mesos或Standalone模式,并讨论性能调优和监控技巧。 书中还可能介绍如何与其他Hadoop生态系统组件(如HDFS、HBase、Cassandra等)进行集成,以及如何利用...