最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:
````
java.lang.OutOfMemoryError: Java heap space
````
这种写法的代码一般如下:
````
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*")
//在驱动程序获取结果集
val datas=ArrayBuffer[String]()
//把所有数据,拉倒驱动端操作
rdd.collect.foreach(line=>{
datas += line.split('#')(1) //得到某个字段
})
sc.stop()
````
上面的这种写法,基本原理就是一次性把所有分区的数据,全部读取到driver节点上,然后开始做处理,所以数据量大的时候,经常会出现内存溢出情况。
(问题一)如何避免这种情况?
分而治之,每次只拉取一个分区的数据到驱动节点上,处理完之后,再处理下一个分数据的数据。
(问题二)如果单个分区的数据已经大到内存装不下怎么办?
给数据集增加更多的分区,让大分区变成多个小分区。
(问题三)如果结果集数据大于内存的大小怎么办?
要么增加驱动节点的内存,要么给每个分区的数据都持久化本地文件上,不再内存中维护
下面来看下关键问题,如何修改spark的rdd分区数量?
我们知道在spark里面RDD是数据源的抽象模型,RDD里面实际上是把一份大数据源切分成了多个分区数据,然后来并行处理这份大数据集。
默认情况下如果Spark从HDFS上加载数据,默认分区个数是按照HDFS的block size来切分的,当然我们在加载的时候可以指定的分区个数。
````
textFile(path,partitionNums)//第二个参数可以指定分区个数
````
如果在加载时不指定分区个数,spark里面还提供了两个函数来进行重分区:
````
(1)def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T]
(2)def repartition(numPartitions: Int):RDD[T]
````
接着我们来看下coalesce函数和repartition函数的区别:
通过查看源码得知repartition函数内部实际上是调用了coalesce函数第二个参数等于true时的封装。所以我们重点来关注下coalesce函数即可:
coalesce的第一个参数是修改后的分区个数
coalesce的第二个参数是控制是否需要shuffle
举一个例子:
当前我们RDD的分区个数是100:
(1)如果要变成10,应该使用
````
rdd.coalesce(10,false)
````
(2)如果要变成300,应该使用
````
rdd.coalesce(300,true)
````
(3)如果要变成1,应该使用
````
rdd.coalesce(1,true)
````
这里解释一下:
分区数从多变少,一般是不需要开启shuffle的,这样性能最高,因为不需要跨网络混洗数据,当然你也可以开启shuffle在特定场景下,如分区数据极其不均衡。但建议一般不要使用。
分区数从少变多,必须开启shuffle,如果不开启那么分区数据是不会改变的,由少变多必须得重新混洗数据才能变多,这里需要注意一点,如果数据量特别少,那么会有一些分区的数据是空。
最后的例子是一种极端场景,如果从多变成1,不开启shuffle,那么可能就个别节点计算压力特别大,集群资源不能充分利用,所以有必要开启shuffle,加速合并计算的流程。
明白了如何改变rdd的分区个数之后,我们就可以文章开头遇到的问题结合起来,拉取大量数据到驱动节点上,如果整体数据集太大,我们就可以增加分区个数,循环拉取,但这里面需要根据具体的场景来设置分区个数,因为分区个数越多,在spark里面生成的task数目就越多,task数目太多也会影响实际的拉取效率,在本案例中,从hdfs上读取的数据默认是144个分区,大约1G多点数据,没有修改分区个数的情况下处理时间大约10分钟,在调整分区个数为10的情况下,拉取时间大约在1-2分钟之间,所以要根据实际情况进行调整。
文章开始前的代码优化后的如下:
````
def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={
if(seq==idx) ds else Iterator()
}
------------------------------
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*")
//在驱动程序获取结果集
val datas=ArrayBuffer[String]()
//重分区并合理优化分区个数
val new_rdd=rdd.coalesce(10)
//得到所有的分区信息
val parts= new_rdd.partitions
//循环处理每个分区的数据,避免导致OOM
for(p<-parts){
//获取分区号
val idx=p.index
//第二个参数为true,避免重新shuffle,保留原始分区数据
val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true)
//读取结果数据
val data=parRdd.collect()
//循环处理数据
for(line<-data){
datas += line.split('#')(1) //得到某个字段
}
}
````
最后在看下,spark任务的提交命令:
````
spark-submit --class SparkHdfsDataAnalysis
--conf spark.driver.maxResultSize=1g
--master yarn
--executor-cores 5
--driver-memory 2g
--executor-memory 3g
--num-executors 10
--jars $jars spark-analysis.jar $1 $2
````
这里面主要关注参数:
````
spark.driver.maxResultSize=1g
driver-memory 2g
````
单次拉取数据结果集的最大字节数,以及驱动节点的内存,如果在进行大结果集下拉时,需要特别注意下这两个参数的设置。
参考文档:
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD
https://spark.apache.org/docs/latest/configuration.html
https://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
《Spark 2.x + Python 大数据机器学习实战》是一门深入探讨如何利用Apache Spark 2.x和Python进行大数据分析和机器学习的课程。Spark作为分布式计算框架,因其高效的内存计算和灵活的数据处理能力,成为了大数据领域...
1. 数据读取模块:负责从数据源(如HDFS、数据库或本地文件)加载用户评价数据。 2. 数据预处理模块:清洗数据,处理缺失值,进行特征工程。 3. 分析与建模模块:进行用户行为分析,构建推荐模型。 4. 结果展示模块...
MLlib还通过了优化的底层算法,使得Spark的机器学习算法能高效地运行在大规模数据集上。 3. SparkSession和SparkSession API SparkSession是Spark 2.0引入的全新入口点,用于取代旧有的SparkContext和SQLContext。...
Spark框架为处理大数据提供了一个强大的平台,特别适合于需要迭代计算的场景,比如机器学习算法。它支持多种数据源,包括HDFS、Cassandra、HBase和本地文件系统等。而在这个文档中,我们将学习如何使用Spark来处理...
Spark2.3.x在SQL支持上的改进和大数据离线分析能力的提升,使得它成为企业级大数据项目的重要工具。通过理解Spark SQL的工作原理,熟练掌握DataFrame和Dataset API,以及进行有效的性能调优,我们可以构建高效的大...
在2018年的Spark Summit大会上,Sameer Agarwal介绍了Apache Spark 2.3版本的一些关键特性。作为Spark的Committer和2.3版本的发布经理,Sameer在Facebook(Big Compute)担任软件工程师,并曾在Databricks和加州大学...
Spark是Apache软件基金会下的一个开源大数据处理框架,以其高效、灵活和易用的特性在大数据领域备受推崇。这个"Spark各方面学习合集"包含了丰富的资源,帮助初学者和经验丰富的开发者深入理解Spark的核心概念和应用...
在.NET 5 中体验大数据和机器学习,开发者可以借助Apache Spark的.NET实现——.NET for Spark。Apache Spark是一个强大的开源分析引擎,专为处理大规模数据而设计,强调内存计算以提高性能,并支持分布式并行处理。...
Spark SQL可以直接读取Hive Metastore中的表,使得Hive用户可以无缝过渡到Spark,享受Spark的高性能查询能力。 10. Spark Streaming与Kafka集成: Spark Streaming可以与Kafka等消息队列系统集成,实现实时数据流...
Spark框架是加州大学伯克利分校AMP实验室开发的一种通用内存并行计算框架。...随着大数据时代的到来,Spark将继续发挥其在内存计算、实时处理和机器学习等领域的技术优势,成为一个非常重要的大数据处理工具。
4. **安装与配置**: 使用这个压缩包,首先需要将其解压到服务器或本地机器的适当位置。接着,配置环境变量以指向Spark的安装目录,这通常涉及修改`PATH`和`SPARK_HOME`变量。为了与Hadoop集群对接,还需要配置Spark...
Spark与Hadoop的集成,使得Spark可以无缝地读取和写入Hadoop的数据,进一步增强了其在大数据领域的应用。 Spark的核心特性包括: 1. **弹性分布式数据集(Resilient Distributed Datasets, RDD)**:RDD是Spark的...
4. **数据读写**:学习如何使用PySpark读取各种格式的数据,如CSV、JSON、Parquet或HDFS上的文件,以及如何将处理后的数据保存。 5. **数据转换**:掌握常见的数据转换操作,如map、filter、reduceByKey、join、...
- **RDD(弹性分布式数据集)**:Spark的基础数据结构,它是不可变的、分区的、可缓存的数据集合。RDD提供了并行操作和容错能力。 - **DAG(有向无环图)**:Spark作业的执行计划以DAG形式表示,每个任务由一系列...
- 从本地文件系统、HDFS等数据源读取:`sc.textFile("path/to/file")` - 从已有的集合转换:`val data = Array(1, 2, 3); val rdd = sc.parallelize(data)` **3. 使用Java创建SparkContext和RDD对象** - **创建...
8. **本地模式运行**:在本地模式下,Spark会在单机上模拟多节点环境,这对于测试和开发非常方便。通过设置`master`参数为`"local"`或`"local[*]"`(*表示使用所有CPU核心)可以在本地运行Spark程序。 9. **错误...
1. **解压文件**:首先,你需要将"spark-1.6.0-bin-hadoop2.4.tgz"解压缩到本地目录,这会得到一个名为"spark-1.6.0-bin-hadoop2.4"的文件夹。 2. **配置环境变量**:在系统的环境变量设置中,添加SPARK_HOME指向...
- 然后,客户端直接连接到存储数据块的datanode读取所需数据块。 3. **HDFS的体系结构**: - **NameNode**:管理文件系统的元数据,记录每个文件中各个块所在的数据节点信息。 - **SecondaryNameNode**:辅助...
Spark架构的核心是RDD(弹性分布式数据集),这是一种不可变、分区的记录集合,可以分布在集群的不同节点上。RDD支持转换(transformations)和行动(actions)两种操作。转换生成新的RDD,不触发计算;行动则会触发...
本书全面覆盖了Spark的核心概念、原理和实战应用,旨在帮助读者从初学者到熟练掌握Spark的大数据处理能力。Spark作为当前大数据处理领域的热门框架,其高效的数据处理速度和强大的分布式计算能力使其在业界广受欢迎...