`
bit1129
  • 浏览: 1067727 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十】Spark RDD API二

 
阅读更多

coGroup

package spark.examples.rddapi

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

object CoGroupTest_05 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (11, "F"), (81, "Y"), (77, "Z"), (31, "X")), 3)
    val z2 = sc.parallelize(List((4, "ABC"), (6, "B2"), (7, "Z2"), (7, "Z3"), (91, "E"), (11, "FF"), (88, "N"), (77, "S"), (36, "M")), 4)
    //隐式函数,定义于PairRDDFunctions
    //结果由两个(至多四个)RDD的Key组成,(Key,(ValuesOfRDD1Seq, ValuesOfRDD2Seq, ValuesOfRDD3Seq))
    //cogroup [W]( other : RDD [(K, W)]): RDD [(K, (Seq [V], Seq [W]))]
    //cogroup [W1 , W2 ]( other1 : RDD [(K, W1)], other2 : RDD [(K, W2)]): RDD [(K , (Seq[V], Seq[W1], Seq[W2 ]))]
    val r = z1.cogroup(z2)
    r.collect.foreach(println)
    /*Result:,
(4,(CompactBuffer(),CompactBuffer(ABC)))
(36,(CompactBuffer(),CompactBuffer(M)))
(88,(CompactBuffer(),CompactBuffer(N)))
(81,(CompactBuffer(Y),CompactBuffer()))
(77,(CompactBuffer(Z),CompactBuffer(S)))
(9,(CompactBuffer(E),CompactBuffer()))
(6,(CompactBuffer(B1),CompactBuffer(B2)))
(11,(CompactBuffer(F),CompactBuffer(FF)))
(3,(CompactBuffer(A),CompactBuffer()))
(7,(CompactBuffer(Z1),CompactBuffer(Z2, Z3)))
(91,(CompactBuffer(),CompactBuffer(E)))
(31,(CompactBuffer(X),CompactBuffer()))

     */
  }
}

 

groupBy

package spark.examples.rddapi

import org.apache.spark.{Partitioner, SparkContext, SparkConf}

object GroupByTest_06 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)
    /**
     * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
     * mapping to that key. The ordering of elements within each group is not guaranteed, and
     * may even differ each time the resulting RDD is evaluated.
     *
     * Note: This operation may be very expensive. If you are grouping in order to perform an
     * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
     * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
     */
    //  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =  groupBy[K](f, defaultPartitioner(this))
    //根据指定的函数进行分组,分组得到的集合的元素类型是(K,V),K是分组函数的返回值,V是组内元素列表
    val r = z1.groupBy(x => if (x._1 % 2 == 0) "even" else "odd")
    r.collect().foreach(println)
    //结果:
    /*
    (even,CompactBuffer((6,B1)))
   (odd,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X)))
     */

    //Partitioner是HashPartitioner
    val r2 = z1.groupBy(_._1 % 2)
    r2.collect().foreach(println)
    //结果:
    /*
    (0,CompactBuffer((6,B1)))
    (1,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X)))
    */

    class MyPartitioner extends Partitioner {
      override def numPartitions = 3

      def getPartition(key: Any): Int = {
        key match {
          case null => 0
          case key: Int => key % numPartitions
          case _ => key.hashCode % numPartitions
        }
      }

      override def equals(other: Any): Boolean = {
        other match {
          case h: MyPartitioner => true
          case _ => false
        }
      }
    }
    println("=======================GroupBy with Partitioner====================")
    //分组的同时进行分区;分区的key是分组函数的计算结果?
    val r3 = z1.groupBy((x:(Int, String)) => x._1, new MyPartitioner())
    r3.collect().foreach(println)
    /*
    //6,3,9一个分区,7,31一个分区,77一个分区
    (6,CompactBuffer((6,B1)))
    (3,CompactBuffer((3,A)))
    (9,CompactBuffer((9,E), (9,Y)))
    (7,CompactBuffer((7,Z1), (7,F)))
    (31,CompactBuffer((31,X)))
    (77,CompactBuffer((77,Z)))
    */

  }


}

 

 collect

 

 

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object CollectTest_07 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Return an array that contains all of the elements in this RDD.
     */
    //这是一个行动算子
    z1.collect().foreach(println)

    /**
     * Return an RDD that contains all matching values by applying `f`.
     */
    //    def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {
    //      filter(f.isDefinedAt).map(f)
    //    }

//    val f  = {
//      case x: (Int, String) => x
//    }
//    val z2 = z1.collect(f)
//    println(z2)
  }
}

 

RDD有个toArray方法,已经不推荐使用了,推荐使用collect方法

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在本文中,将详细介绍Spark的RDD API,以及如何在RDD、DataFrame和Dataset之间相互转换,以及如何使用Spark SQL进行操作。 首先来看RDD API的聚合操作aggregate。aggregate函数是RDD API中非常重要的一个动作操作,...

    spark-rdd-APi

    标题:“Spark RDD API”说明了本文档将专注于Apache Spark中弹性分布式数据集(RDD)的API。RDD是Spark的核心概念,它是一个容错的、并行的数据结构,使得用户能够处理大数据集。本文档将基于Scala语言中的RDD实现...

    spark rdd api

    本文详细的描述了spark rdd的api 这些api 应该够我们日常生产使用了

    Spark学习--RDD编码

    在Java中,函数需要作为实现了Spark的org.apache,spark.api.java.function包中的任一函数接口的对象传递。 函数名 实现的方法 用途 Function, R> R call(T) 接收一个输入值并返回一个输出值,用于类似map() 和filter...

    电影评分数据汇总(使用spark2.4+scala, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    spark API RDD

    Spark提供了多种高级API,其中RDD(Resilient Distributed Dataset,弹性分布式数据集)是其核心抽象之一,代表了一个不可变、分区的数据集,可以进行并行操作。本文将详细介绍Spark中的RDD API,这些知识点对初学者...

    spark_API文档

    2. **RDD(Resilient Distributed Datasets)**:RDD是Spark的核心抽象,表示不可变、分区的记录集合。RDD支持操作如转换(transformations)和行动(actions)。转换创建新的RDD,而行动触发计算并可能返回结果到...

    spark RDD 论文 中文版

    ### Spark RDD论文中文版知识点详述 #### 一、引言 Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在...

    Spark2.0.2API

    Spark 2.0.2 API 是 Apache Spark 的一个重要版本,主要针对大数据处理和分析提供了丰富...在实际项目中,结合压缩包文件"spark2_0_2scalaAPI"中的文档,可以详细地了解每个接口的功能和使用方法,进一步提升开发效率。

    浅谈Spark RDD API中的Map和Reduce

    本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。 如何创建RDD? RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。 举例:从普通数组创建RDD,里面包含了1到9

    【SparkCore篇07】RDD数据读取和保存1

    2. 将RDD保存为Sequence文件:`rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")` 3. 读取Sequence文件:`val seq = sc.sequenceFile[Int, Int]("file:///opt/module/spark/seqFile")` 4. 打印读取的...

    spark RDD 论文

    - **Spark 的实现**:Spark 中的 RDD 是通过 Scala 语言实现的,并提供 Java 和 Python 等语言的 API 接口。此外,Spark 还提供了对 SQL 查询的支持以及机器学习库 MLlib。 - **用户案例与基准测试**:Spark 项目...

    spark-scala-api

    Spark Scala API 是一个用于大数据处理的强大工具,它结合了Apache Spark的高性能计算框架与Scala编程语言的简洁性和表达力。这个zip压缩包很可能是包含了Spark的Scala开发接口及相关示例,便于开发者在Scala环境中...

    sparkrdd的讲解

    ### Spark RDD详解 #### Spark计算模型与RDD概念 在探讨Spark的弹性分布式数据集(RDD)之前,我们首先需要理解Spark的基本计算模型。Spark是一种基于内存的分布式计算框架,其核心设计思想在于通过缓存中间结果来...

    Spark常用的算子以及Scala函数总结.pdf

    了解 Scala 是学习 Spark 的一大优势,因为 Spark 中的许多高级功能和 API 都是用 Scala 实现的,因此使用 Scala 访问这些功能和 API 会更加简单。 在 Spark 中,数据以弹性分布式数据集(RDD)的形式存在,RDD 是 ...

    Spark实战高手之路-第5章Spark API编程动手实战(1)

    - **Spark API核心概念**:Spark的核心数据结构是RDD(Resilient Distributed Dataset),这是一种只读的分区记录集合,支持高效的数据并行处理。学习Spark API时,需要熟悉RDD的各种操作,包括转换(Transformation...

    Spark 1.0.0 API (java)

    **Spark 1.0.0 API (Java) 深度解析** Spark 是一个快速、通用且可扩展的大数据处理框架,它最初由加州大学伯克利分校AMPLab开发,并随后成为Apache软件基金会的顶级项目。Spark 1.0.0版本是其发展中的一个重要里程...

    spark 2.0.1 JavaAPI

    2. **Spark SQL**:Spark SQL是Spark处理结构化数据的模块,它将SQL查询与DataFrame API集成,使用户可以方便地在SQL和DataFrame之间切换。 3. **Spark Streaming**:提供了一个高级抽象,用于处理连续的数据流,...

Global site tag (gtag.js) - Google Analytics