spark中有几个算子比较重要,开发中不是很常用,但很多算子的底层都是依靠这几个算子实现的,比如CombineByKey,像reduceByKey底层是combineByKey实现的。
首先介绍combineByKey
这个算子 主要需要三个参数,第一个是对每个分区中每个key的第一个值 进行初始化,也就是每个分区内,有多少个key就会执行多少次这个初始化
object CombineByKeyTest01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster("local").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(("a","2"),("a","3"),("a","4"),("b","3"),("b","4")),2)
rdd1.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
val rdd2=rdd1.combineByKey(x=>{println("fff=="+x);(x.toInt*2).toString()},(x1:String,x2:String)=>(x1.toInt+x2.toInt).toString(),(s1:String,s2:String)=>(s1.toInt+s2.toInt).toString())
rdd2.foreachPartition(f=>{
while(f.hasNext){
print(f.next())
}
})
sc.stop()
}
第一个分区为(a,2)(a,3),第二个分区为(a,4)(b,3)(b,4)
对于第一个分区 ,首先执行第一步,即(a,4),然后聚合,得出(a,7)
对于第二个分区,首先执行第一步,(a,8),(b,6),(b,4)然后聚合得出(a,8),(b,10)
最后,对所有分区进行聚合,调用第三个参数对于的函数,得出(a,15),(b,10)
接下来介绍Aggregate
这个算子主要是三个参数,第一个参数是初始值,第二个参数进行分区内聚合,第三个参数是对每个分区最后的结果进行聚合
object AggregateTest01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster("local").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List("1","2","3","4","5","6","7"),2)
rdd1.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
//aggregate操作
val result=rdd1.aggregate("3")(seqOphyj,combOphyj)
println(result)
sc.stop()
}
//每个分区中 每个元素 乘以2 然后相加 进行分区内聚合操作
def seqOphyj(s1:String,s2:String):String={
val ss1=(2*s1.toInt+2*s2.toInt).toString()
ss1
}
//每个分区的结果 与初始值 进行相加操作
def combOphyj(s1:String,s2:String):String={
val ss1=(s1.toInt+s2.toInt).toString()
ss1
}
}
第一个分区为1,2,3 第二个分区为4,5,6,7
对于第一个分区,首先进行第二个函数 即3*2+1*2=8 8*2+2*2=20 20*2+3*2=46
对于第二个分区,首先进行第二个函数, 3*2+4*2=14 14*2+5*2=38 38*2+6*2=88 88*2+7*2=190
然后进行调用第三个函数 3+46+190=239
接下来是AggregateByKey
这个算子基本和Aggregate类似,是对相同key的value进行聚合,但还是有区别的,区别在下面说。
object AggregateByKeyTest01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster("local").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(("1","2"),("1","3"),("1","4"),("2","3"),("2","4")),2)
rdd1.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
val rdd2=rdd1.aggregateByKey("2")(seqOphyj, combOphyj)
rdd2.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
sc.stop()
}
//每个分区中 每个元素 乘以2 然后相加 进行分区内聚合操作
def seqOphyj(s1:String,s2:String):String={
val ss1=(2*s1.toInt+2*s2.toInt).toString()
println("seq="+s1+"&&"+s2+"====="+ss1)
ss1
}
//每个分区的结果 与初始值 进行相加操作
def combOphyj(s1:String,s2:String):String={
val ss1=(s1.toInt+s2.toInt).toString()
println("com="+s1+"&&"+s2+"====="+ss1)
ss1
}
}
第一个分区为(1,2)(1,3),第二个分区为(1,4)(2,3)(2,4)
首先计算第一个分区。对于key1,value为(2*2+2*2)*2+3*2=22
再计算第二个分区,对于key1,value为2*2+4*2=12,对于key2,value为(2*2+3*2)*2+4*2=28
计算第三个方法,为(1,34)(2,28),它与Aggregate的不同点在于第三个函数执行的时候。默认值不参与运算,而aggregate是参与的。
分享到:
相关推荐
Spark 算子可以分为以下几类: 1. 转换算子(Transformation):将一个RDD 转换为另一个RDD,例如 map、filter、flatmap 等。 2. 动作算子(Action):将RDD 转换为非RDD 的结果,例如 count、collect、foreach 等...
下面将详细介绍几个常用的 Transformation 操作。 ##### 1.1 map `map` 是一种常见的 Transformation,它接收一个函数作为参数,并将这个函数应用到 RDD 的每个元素上,从而产生一个新的 RDD。例如,如果原始 RDD ...
本文将围绕“Spark性能调优的几大原则”这一主题,详细介绍八大核心原则及其应用场景。 #### 二、避免创建重复的RDD **原则概述:** 在开发Spark作业时,首先需要基于某个数据源(例如Hive表或HDFS文件)创建一个...
在Spark 1.4中,数据处理的流程通常包括以下几个步骤: - **初始化SparkContext**: 创建SparkContext对象,这是所有Spark操作的入口点。 - **加载数据**: 从文件系统、数据库或其他来源加载数据,生成RDD、DataFrame...
RDD 具有以下几个特点: - **容错性**:RDD 能够自动恢复丢失的数据分区。 - **可缓存**:数据可以保存在内存中,以提高重复使用的性能。 - **血统**:每个 RDD 都记录了它是由哪些操作产生的,这种依赖关系被...
在开发Spark应用程序时,有几个基本原则需要遵守: - **RDD Lineage设计**:合理设计RDD之间的依赖关系,即RDD Lineage,是确保高效执行的关键。一个良好的RDD Lineage可以减少不必要的数据处理步骤,从而提高整体...
开发时调优方面,Spark开发者需要关注几个关键点。首先,为了提高效率,应避免在应用中创建重复的数据集,并尽可能地复用同一个数据集。对于重复使用的数据集,Spark提供了持久化机制(persistence),以优化内存...
关于Spark Streaming在滴滴的ETL实践收益,主要有以下几个方面:能够服务后续的上百个模块;数据清洗需求可以在分钟级别完成;并且承担了90%的实时数据清洗任务。 在总结中,滴滴强调了日志规范化来提高ETL的效率,...
为了最大化Spark的性能,我们需要关注以下几个方面: 1. 内存管理:合理设置Executor内存大小,避免溢出和频繁磁盘IO。 2. 广播变量:使用广播变量减少数据在网络间传输。 3. 复用算子:尽量使用map而不是flatMap,...
3. **Bypass Merge SortShuffle**: 当满足特定条件(map任务数量小于`spark.shuffle.sort.bypassMergeThreshold`且非聚合类shuffle算子)时,Spark会使用bypass机制,跳过排序过程。每个task会为每个reducer创建一个...
如果您的 Spark 作业处理的数据量非常大,达到几亿的数据量,此时运行 Spark 作业会时不时地报错,例如 shuffle output file cannot find,executor lost,task lost,out of memory 等,这可能是 Executor 的堆外...
Spark的运行架构主要包括以下几个关键组件: - **Cluster Manager (Master)**:在standalone模式下是Master节点,负责管理整个集群;在YARN或Mesos模式下,则由相应的资源管理器担任该角色。 - **Worker节点**:从...
本文将深入探讨几个关键的调优策略,包括合理使用广播变量、选择正确的RDD持久化策略、避免不必要的shuffle操作以及使用高效的算子。 首先,广播变量在Spark中扮演着关键角色。当一个任务需要频繁访问大体积的共享...
源码可能包含以下几个部分:数据读取(如从HDFS或本地文件系统读取日志文件),数据处理(执行上述的预处理和分析操作),以及结果的保存或可视化。 为了运行和复现这个项目,你需要一个Spark环境,可以是本地模式...
在音乐专辑数据挖掘中,我们可能关心以下几个方面: 1. **用户行为分析**:通过分析用户的播放记录,可以挖掘出用户的喜好模式,如最常听的艺术家、最热衷的音乐类型等。这有助于推荐系统的设计和优化。 2. **时间...
Spark调度架构原理可以分为以下几个步骤: 1. 启动Spark集群:启动Master和多个Worker节点,Master节点主要负责管理和监控集群,Worker节点主要负责执行应用程序的任务。 2. Master节点对Worker节点的心跳机制:...
RDD的主要特性包括以下几个方面: 1. **分片列表**:RDD是由多个分区(Partitions)组成的,每个分区代表数据集的一个部分,可以在集群的不同节点上并行处理。分片是计算的基本单位。 2. **计算函数**:每个分片都...
- **解释**:RDD 的高效性来源于以下几个方面: - **不可变性**:RDD 一旦创建后不可修改,这使得它们适合于并行计算环境。 - **Lazy 计算**:RDD 的 Transformation 操作不会立即执行,只有当 Action 操作触发时...
在学习Spark RDD编程时,还需要关注以下几个关键点: 1. **分区与并行度**:RDD的分区决定了数据在集群中的分布,合理的分区策略可以提高并行计算的效率。可以通过`repartition`或`coalesce`方法调整分区数量。 2....
本文将探讨关于DataFrame和Spark SQL在数据取值时可能遇到的几个误区。 首先,需要明确的是,DataFrame在Spark中并不是返回一个具体对象,而是返回一个数据集。这意味着当我们使用DataFrame进行数据操作时,每一次...