`
bit1129
  • 浏览: 1069565 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark九十六】RDD API之combineByKey

 
阅读更多

1. combineByKey函数的运行机制

 

RDD提供了很多针对元素类型为(K,V)的API,这些API封装在PairRDDFunctions类中,通过Scala隐式转换使用。这些API实现上是借助于combineByKey实现的。combineByKey函数本身也是RDD开放给Spark开发人员使用的API之一

 

首先看一下combineByKey的方法说明:

 

/**
   * Generic function to combine the elements for each key using a custom set of aggregation
   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
   * Note that V and C can be different -- for example, one might group an RDD of type
   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
   *
   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
   * - `mergeCombiners`, to combine two C's into a single one.
   *
   * In addition, users can control the partitioning of the output RDD, and whether to perform
   * map-side aggregation (if a mapper can produce multiple items with the same key).
   */
  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

combineByKey的功能是对RDD中的数据集按照Key进行聚合(想象下Hadoop MapReduce的Combiner,用于Map端做Reduce)。聚合的逻辑是通过自定义函数提供给combineByKey。

从上面的源代码中可以看到,combineByKey是把(K,V)类型的RDD转换为(K,C)类型的RDD,C和V可以不一样。

combineByKey函数需要三个重要的函数作为参数

createCombiner:在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey第一次遇到值为k的Key(类型K),那么将对这个(k,v)调用combineCombiner函数,它的作用是将v转换为c(类型是C,聚合对象的类型,c作为局和对象的初始值)

mergeValue:在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey不是第一次(或者第二次,第三次...)遇到值为k的Key(类型K),那么将对这个(k,v)调用mergeValue函数,它的作用是将v累加到聚合对象(类型C)中,mergeValue的

类型是(C,V)=>C,参数中的C遍历到此处的聚合对象,然后对v进行聚合得到新的聚合对象值

mergeCombiners:因为combineByKey是在分布式环境下执行,RDD的每个分区单独进行combineByKey操作,最后需要对各个分区的结果进行最后的聚合,它的函数类型是(C,C)=>C,每个参数是分区聚合得到的聚合对象。

 

通过上面的分析,combineByKey的流程是:

 

假设一组具有相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner 将第一个 record 的value 初始化为 c (比如,c = value),然后从第二个 record 开始,来一个 record 就使用 mergeValue(c,
record.value) 来更新 c,比如想要对这些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到records 全部被 mergeValue(),得到结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来,
combineByKey() 使用前面的方法不断计算得到 c'。现在如果要求这两组 records 总的 combineByKey() 后的结果,那么可以使用 final c = mergeCombiners(c, c') 来计算。

 

2.combineByKey应用举例

2.1 求均值

假设有一组气象数据,每行数据包含日期和当天的气温(比如,20150601 27),那么可以用combineByKey求每月的平均温度,伪代码如下:

 

C的类型是(Int,Int),表示对于给定的月份,温度累加值和当月的天数

 

val rdd = sc.textFile("气象数据")
val rdd2 = rdd.map(x=>x.split(" ")).map(x => (x(0).substring("从年月日中提取年月"),x(1).toInt))
val createCombiner = (k: String, v: Int)=> {
  (v,1)
}
val mergeValue = (c:(Int, Int), v:Int) => {
 (c._1 + v, c._2 + 1)
}

val mergeCombiners = (c1:(Int,Int),c2:(Int,Int))=>{
  (c1._1 + c2._1, c1._2 + c2._2)
}


val vdd3 = vdd2.combineByKey(
createCombiner,
mergeValue,
mergeCombiners
)

rdd3.foreach(x=>println(x._1 + ": average tempreture is " + x._2._1/x._2._2)

 

 

 

 

 

分享到:
评论

相关推荐

    spark-rdd-APi

    标题:“Spark RDD API”说明了本文档将专注于Apache Spark中弹性分布式数据集(RDD)的API。RDD是Spark的核心概念,它是一个容错的、并行的数据结构,使得用户能够处理大数据集。本文档将基于Scala语言中的RDD实现...

    RDD编程API

    ### RDD编程API详解 #### 一、概述 在Apache Spark框架中,弹性分布式数据集(Resilient Distributed Dataset,简称RDD)是基本的数据抽象。它是一个不可变的、分布式的对象集合,可以并行地作用于集群上的节点。...

    Spark API Master

    在大数据处理领域,Apache Spark凭借其高效的数据处理能力和丰富的API集合成为业界的首选工具之一。本文将根据《Apache Spark API by Example》这本书的内容,详细介绍Spark API的核心功能,并通过具体的例子来加深...

    spark-in-action

    - **理解RDD**:RDD(Resilient Distributed Dataset)是一种只读的分区数据集合,它是Spark中最基本的数据抽象。RDD具有高度容错性和可扩展性,支持各种数据操作,如map、reduce、filter等。通过这些操作,用户可以...

    spark的常用操作

    通过以上介绍可以看出,Spark 提供了丰富的 API 用于数据处理,这些 API 使得开发人员能够高效地处理大规模数据集。无论是进行简单的数据转换还是复杂的聚合操作,Spark 都能提供相应的工具来满足需求。

    Next-Gen Big Data Analytics using the Spark stack.zip

    为了提高性能,Spark提供了多种优化策略,如Tungsten自动代码生成、Shuffle优化、宽依赖的CombineByKey等。此外,用户还可以通过调整executor数量、内存分配、任务并发度等参数来优化作业性能。 九、Spark的生态...

    storm与spark简介

    1. **RDD (Resilient Distributed Dataset)**: 弹性分布式数据集是 Spark 中最基本的抽象数据类型。它是一个不可变的、可分区的集合,可以在集群中并行处理。RDD 具有以下几个特点: - **容错性**:RDD 能够自动...

    大数据spark计算TopN的素材.rar

    这个过程可以通过使用PairRDD,利用reduceByKey或combineByKey等操作来实现。 4. 使用Spark SQL的DataFrame/Dataset API Spark SQL提供DataFrame和Dataset API,它们是Spark的高级接口,具有更丰富的SQL语句支持和...

    Kafka-Spark-stream-with-static-data-using-join:使用join的Kafka Spark流与静态数据

    3. 数据更新:如果静态数据需要定期更新,可以考虑使用Spark的UpdateStateByKey或CombineByKey功能,以便随着时间推移维护和更新状态。 在完成所有的处理后,你可以使用`DStream.foreachRDD`将结果写回到另一个...

    pyspark-tutorial:PySpark-Tutorial提供使用PySpark的基本算法

    PySpark是用于Spark的Python API。 PySpark教程的目的是提供使用PySpark的基本分布式算法。 PySpark具有用于基本测试和调试的交互式外壳程序( $SPARK_HOME/bin/pyspark ),不应将其用于生产环境。 您可以使用$...

Global site tag (gtag.js) - Google Analytics