`
bit1129
  • 浏览: 1069538 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark九十七】RDD API之aggregateByKey

 
阅读更多

1. aggregateByKey的运行机制

 

  /**
   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
   * This function can return a different result type, U, than the type of the values in this RDD,
   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
   * as in scala.TraversableOnce. The former operation is used for merging values within a
   * partition, and the latter is used for merging values between partitions. To avoid memory
   * allocation, both of these functions are allowed to modify and return their first argument
   * instead of creating a new U.
   */
  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
  }

从aggregateByKey的源代码中,可以看出

a.aggregateByKey把类型为(K,V)的RDD转换为类型为(K,U)的RDD,V和U的类型可以不一样,这一点跟combineByKey是一样的,即返回的二元组的值类型可以不一样

b.aggregateByKey内部是通过调用combineByKey实现的,combineByKey的createCombiner函数逻辑由zeroValue这个变量实现,zeroValue作为聚合的初始值,通常对于加法聚合则为0,乘法聚合则为1,集合操作则为空集合

c.seqOp在combineByKey中的功能是mergeValues,(U,V)=>U

d.combOp在combineByKey中的功能是mergeCombiners

 

 

2. aggregateByKey举例

2.1 求均值

 

val rdd = sc.textFile("气象数据")  
val rdd2 = rdd.map(x=>x.split(" ")).map(x => (x(0).substring("从年月日中提取年月"),x(1).toInt))  
val zeroValue = (0,0) 
val seqOp= (u:(Int, Int), v:Int) => {  
 (u._1 + v, u._2 + 1)  
}  
  
val compOp= (c1:(Int,Int),c2:(Int,Int))=>{  
  (u1._1 + u2._1, u1._2 + u2._2)  
}  
  
  
val vdd3 = vdd2.aggregateByKey(  
zeroValue ,  
seqOp,  
compOp
)  
  
rdd3.foreach(x=>println(x._1 + ": average tempreture is " + x._2._1/x._2._2) 

 

从求均值的实现来看,aggregate通过提供零值的方式,避免了combineByKey中的createCombiner步骤(createCombiner本质工作就是遇到第一个key时进行初始化操作,这个初始化不是提供零值,而是对第一个(k,v)进行转换得到c的初始值))

 

 

 

 

分享到:
评论

相关推荐

    spark-rdd-APi

    标题:“Spark RDD API”说明了本文档将专注于Apache Spark中弹性分布式数据集(RDD)的API。RDD是Spark的核心概念,它是一个容错的、并行的数据结构,使得用户能够处理大数据集。本文档将基于Scala语言中的RDD实现...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在本文中,将详细介绍Spark的RDD API,以及如何在RDD、DataFrame和Dataset之间相互转换,以及如何使用Spark SQL进行操作。 首先来看RDD API的聚合操作aggregate。aggregate函数是RDD API中非常重要的一个动作操作,...

    spark rdd api

    本文详细的描述了spark rdd的api 这些api 应该够我们日常生产使用了

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    Spark学习--RDD编码

    在Java中,函数需要作为实现了Spark的org.apache,spark.api.java.function包中的任一函数接口的对象传递。 函数名 实现的方法 用途 Function, R> R call(T) 接收一个输入值并返回一个输出值,用于类似map() 和filter...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    电影评分数据汇总(使用spark2.4+scala, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    spark API RDD

    Spark提供了多种高级API,其中RDD(Resilient Distributed Dataset,弹性分布式数据集)是其核心抽象之一,代表了一个不可变、分区的数据集,可以进行并行操作。本文将详细介绍Spark中的RDD API,这些知识点对初学者...

    大数据spark学习之rdd概述

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...

    spark_API文档

    Spark API文档是大数据领域中非常重要的参考资料,它涵盖了Apache Spark的核心功能和编程接口。Spark作为一个快速、通用且可扩展的数据处理引擎,广泛应用于数据分析、机器学习以及实时流处理等多个场景。Scala是...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...

    10 实战解析spark运行原理和RDD解密

    "Spark 运行原理和 RDD 解密" Spark 是一个分布式计算框架,基于内存和磁盘,特别适合于迭代计算。Spark 的运行原理可以分为两大部分:Driver 端和 Executor 端。Driver 端负责提交任务,Executor 端负责执行任务...

    spark RDD 论文 中文版

    Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在大数据处理中的优势,并通过具体的案例来证明其有效性...

    Spark分布式计算和RDD模型研究.docx

    Spark分布式计算和RDD模型是大数据处理领域的重要技术,它针对MapReduce等传统框架在处理迭代式算法和数据重用方面的不足进行了创新。RDD(Resilient Distributed Datasets)是Spark的核心概念,它是一种弹性分布式...

    Spark分布式计算和RDD模型研究.pdf

    Spark分布式计算和RDD模型是大数据处理领域的重要技术,它针对MapReduce等传统框架存在的不足,尤其是在数据重用和效率方面的问题进行了创新。RDD(Resilient Distributed Datasets)是Spark的核心概念,它是一种...

    Spark2.0.2API

    在Spark 2.0.2中,RDD API已经相当成熟,可以进行创建、转换和行动等操作,支持数据的容错和高效计算。此外,DataFrame和Dataset API的引入,为数据处理提供了更加面向对象的抽象,提高了开发效率和性能。DataFrame...

    Spark实战高手之路-第5章Spark API编程动手实战(1)

    ### Spark实战高手之路-第5章Spark API编程动手实战(1) #### 一、基础知识概述 **Spark**作为一项先进的大数据处理技术,在云计算领域占据着举足轻重的地位。本书《Spark实战高手之路》旨在帮助读者从零开始,...

    浅谈Spark RDD API中的Map和Reduce

    本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。 如何创建RDD? RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。 举例:从普通数组创建RDD,里面包含了1到9

Global site tag (gtag.js) - Google Analytics