`
bit1129
  • 浏览: 1076891 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark七十九】Spark RDD API一

 
阅读更多

aggregate

package spark.examples.rddapi

import org.apache.spark.{SparkConf, SparkContext}

//测试RDD的aggregate方法
object AggregateTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("AggregateTest_00")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List(1, 3, 5, 7, 7, 5, 3, 3, 79), 2)
    /**
     * Aggregate the elements of each partition, and then the results for all the partitions, using
     * given combine functions and a neutral "zero value". This function can return a different result
     * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
     * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
     * allowed to modify and return their first argument instead of creating a new U to avoid memory
     * allocation.
     */

    // def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    //T是RDD中的元素类型,U是aggregate方法自定义的泛型参数,aggregate返回U(而不一定是T)
    //两个分区取最大值,然后相加
    //math.max(_, _)表示针对每个partition实施的操作, _ + _表示combiner

    val r1 = z1.aggregate(0)(math.max(_, _), _ + _)
    println(r1) //86

    //RDD元素类型字符串,aggregate的返回类型同样为String
    val z2 = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 2)
    val r2 = z2.aggregate("xx")(_ + _, _ + _)
    println(r2) //连接操作,结果xxxxabcxxdef,每个分区计算时,加上xx,最后两个分区计算时,继续把xx加上

    //_ + _的道理也是(x,y) => x + y
    //(x,y)=>math.max是做两两比较吗?
    val z3 = sc.parallelize(List("12", "23", "345", "4567"), 2)
    val r3 = z3.aggregate("")((x, y) => math.max(x.length, y.length).toString, (x, y) => x + y)
    println(r3)   ///结果24,表示两个分区的字符串长度最长的长度转成String后,做拼接

    //结果为什么是11?
    val r4 = sc.parallelize(List("12", "23", "345", "4567"), 2).aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y)
    println(r4)
  }
}

cartesian

 

package spark.examples.rddapi

import org.apache.spark.rdd.{CartesianRDD, RDD}
import org.apache.spark.{SparkContext, SparkConf}


object CartesianTest_01 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("AggregateTest_00")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List(2, 3, 4, 5, 6), 2)
    val z2 = sc.parallelize(List("A", "B", "C", "D", "E", "F", "G", "H", "I", "J"), 3)

    /**
     * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
     * elements (a, b) where a is in `this` and b is in `other`.
     */

    //def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
    //z1 和 z2集合的元素类型可以不同,并且cartesian是个转换算子,
    //调用z.collect触发作业
    val z = z1.cartesian(z2)
    println("Number of partitions: " + z.partitions.length) //6
    var count = 0

    z.collect().foreach(x  => {println(x._1 + "," + x._2); count = count + 1}) //

   println("count =" + count) //50

 

 

checkpoint

 

注意点:

Checkpointed RDDs are stored as a binary file within the checkpoint directory which can be specied using the Spark context. (Warning: Spark applies lazy evaluation. Checkpointing will not occur until an action is invoked.) Important note: the directory "my directory name" should exist in all slaves. As an alternative you could use an HDFS directory URL as well.

 

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object CheckpointTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("AggregateTest_00")
    val sc = new SparkContext(conf);
    val z = sc.parallelize(List(3, 6, 7, 9, 11))
    sc.setCheckpointDir("file:///d:/checkpoint")
    /**
     * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
     * directory set with SparkContext.setCheckpointDir() and all references to its parent
     * RDDs will be removed. This function must be called before any job has been
     * executed on this RDD. It is strongly recommended that this RDD is persisted in
     * memory, otherwise saving it on a file will require recomputation.
     */
    z.checkpoint()
    println("length: " + z.collect().length) //rdd存入目录
    println("count: " + z.count()) //5
  }
}

 

 

d:\checkpoint>tree /f
文件夹 PATH 列表
卷序列号为 EA23-0890
D:.
└─9b0ca0d9-f7fb-46bb-84dc-097d95b9e7b8
    └─rdd-0
            .part-00000.crc
            part-00000

1. 运行过程中发现,checkpoint目录会自动创建,无需预创建

2.程序运行结束后,checkpoint目录并没有删除,上面这些属于checkpoint目录下的目录和文件也没有删除,再次运行会产生新的目录

 

Repartition

 

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object RepartitionTest_04 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("RepartitionTest_04")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List(3, 9, 18, 22, 11, 9, 8), 3)
    //z1.coalesce(5, true)的效果一样,开启shuffle
    /**
     * Return a new RDD that has exactly numPartitions partitions.
     *
     * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
     * a shuffle to redistribute data.
     *
     * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
     * which can avoid performing a shuffle.
     */
    val r1 = z1.repartition(5)
     r1.collect().foreach(println)
  }
}

 

coalesce

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

//coalesce:合并
object CoalesceTest_03 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoalesceTest_03")
    val sc = new SparkContext(conf);
    val z = sc.parallelize(List(3, 9, 18, 22, 11, 9, 8), 3)

    /**
     * Return a new RDD that is reduced into `numPartitions` partitions.
     *
     * This results in a narrow dependency, e.g. if you go from 1000 partitions
     * to 100 partitions, there will not be a shuffle, instead each of the 100
     * new partitions will claim 10 of the current partitions.
     *
     * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
     * this may result in your computation taking place on fewer nodes than
     * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
     * you can pass shuffle = true. This will add a shuffle step, but means the
     * current upstream partitions will be executed in parallel (per whatever
     * the current partitioning is).
     *
     * Note: With shuffle = true, you can actually coalesce to a larger number
     * of partitions. This is useful if you have a small number of partitions,
     * say 100, potentially with a few partitions being abnormally large. Calling
     * coalesce(1000, shuffle = true) will result in 1000 partitions with the
     * data distributed using a hash partitioner.
     */
    //shuffle默认为false
    //将分区数由3变成2,大变小使用narrow dependency
    val zz = z.coalesce(2, false)
    println("Partitions length: " + zz.partitions.length) //2
    println(zz.collect()) //结果是[I@100498c?
    zz.collect().foreach(println)

    //将分区数由3变成6,少变多必须使用shuffle=true
    //在单机上没有发现有问题
    //在cluster环境下,为了保证新的分区分布到不同的节点,应该使用shuffle为true
    //也就是说,少变多也可以使用shuffle为false,但是达不到分区数据进行重新分布的目的
    val z2 = z.coalesce(6, false)
    z2.collect().foreach(println)

    //分区扩大,同时设置shuffle为true
    val z3 = z.coalesce(6, true)
    z3.collect().foreach(println)




  }
}

 

 

 

 

分享到:
评论

相关推荐

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    在本文中,将详细介绍Spark的RDD API,以及如何在RDD、DataFrame和Dataset之间相互转换,以及如何使用Spark SQL进行操作。 首先来看RDD API的聚合操作aggregate。aggregate函数是RDD API中非常重要的一个动作操作,...

    spark-rdd-APi

    标题:“Spark RDD API”说明了本文档将专注于Apache Spark中弹性分布式数据集(RDD)的API。RDD是Spark的核心概念,它是一个容错的、并行的数据结构,使得用户能够处理大数据集。本文档将基于Scala语言中的RDD实现...

    spark rdd api

    本文详细的描述了spark rdd的api 这些api 应该够我们日常生产使用了

    Spark学习--RDD编码

    Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、Java、Scala中任意类型的对象,甚至可以包含用户自定义对象,本文主要通过Java实现相关...

    电影评分数据汇总(使用spark2.4+scala, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    spark API RDD

    Spark提供了多种高级API,其中RDD(Resilient Distributed Dataset,弹性分布式数据集)是其核心抽象之一,代表了一个不可变、分区的数据集,可以进行并行操作。本文将详细介绍Spark中的RDD API,这些知识点对初学者...

    spark RDD操作详解

    **RDD(弹性分布式数据集)**是Apache Spark的核心抽象,代表一种不可变的、可分区的、能够进行并行操作的数据集合。它提供了丰富的API来支持高效的大规模数据处理任务。 - **只读性**:一旦创建了RDD,其内容不能...

    spark_API文档

    Spark API文档是大数据领域中非常重要的参考资料,它涵盖了Apache Spark的核心功能和编程接口。Spark作为一个快速、通用且可扩展的数据处理引擎,广泛应用于数据分析、机器学习以及实时流处理等多个场景。Scala是...

    spark RDD 论文 中文版

    Spark RDD(Resilient Distributed Datasets)作为Apache Spark的核心组件之一,在大数据处理领域扮演着至关重要的角色。本论文旨在探讨Spark RDD的设计理念及其在大数据处理中的优势,并通过具体的案例来证明其有效性...

    spark2.1.0.chm(spark java API)

    1. Resilient Distributed Dataset (RDD):Spark的核心数据结构,是不可变、分区的元素集合,具有容错性。Java API提供了创建和操作RDD的方法。 2. DataFrame:基于DataFrame的API提供了更高级别的抽象,它是一个表...

    Spark2.0.2API

    Spark 2.0.2 API 是 Apache Spark 的一个重要版本,主要针对大数据处理和分析提供了丰富的编程接口。这个版本尤其强调了 Scala 语言的使用,使得数据科学家和开发人员能够利用 Scala 的强大功能来构建高性能的数据...

    Spark实战高手之路-第5章Spark API编程动手实战(1)

    ### Spark实战高手之路-第5章Spark API编程动手实战(1) #### 一、基础知识概述 **Spark**作为一项先进的大数据处理技术,在云计算领域占据着举足轻重的地位。本书《Spark实战高手之路》旨在帮助读者从零开始,...

    Spark api chm格式下载.rar

    - **Transformation** 和 **Action**:Spark API中的两种主要操作类型,Transformation创建新的RDD或DataFrame,Action触发计算并可能返回结果到Driver程序。 - ** DAG(Directed Acyclic Graph)**:Spark的任务...

    spark RDD 论文

    - **Spark 的实现**:Spark 中的 RDD 是通过 Scala 语言实现的,并提供 Java 和 Python 等语言的 API 接口。此外,Spark 还提供了对 SQL 查询的支持以及机器学习库 MLlib。 - **用户案例与基准测试**:Spark 项目...

    sparkrdd的讲解

    ### Spark RDD详解 #### Spark计算模型与RDD概念 在探讨Spark的弹性分布式数据集(RDD)之前,我们首先需要理解Spark的基本计算模型。Spark是一种基于内存的分布式计算框架,其核心设计思想在于通过缓存中间结果来...

    spark-scala-api

    Spark Scala API 是一个用于大数据处理的强大工具,它结合了Apache Spark的高性能计算框架与Scala编程语言的简洁性和表达力。这个zip压缩包很可能是包含了Spark的Scala开发接口及相关示例,便于开发者在Scala环境中...

    Spark 1.0.0 API (java)

    Spark 1.0.0版本是其发展中的一个重要里程碑,为开发者提供了强大的分布式计算能力,特别是对于Java开发者而言,Spark提供了丰富的Java API,使得在Java环境中进行大数据处理变得更加便捷。 ### 1. Spark核心概念 ...

    浅谈Spark RDD API中的Map和Reduce

    本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。 如何创建RDD? RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。 举例:从普通数组创建RDD,里面包含了1到9

    spark rdd 实战 ,基本语法

    本文将对 Spark RDD 进行深入浅出的讲解,涵盖 Spark 的基本特性、生态体系、优势、支持的 API、运行模式、RDD 的概念和类型、容错 Lineage、缓存策略等知识点。 Spark 的基本特性 Spark 是一个高可伸缩性、高...

    spark 2.0.1 JavaAPI

    Spark 2.0.1是Apache Spark的一个重要版本,它为大数据处理提供了高效、易用的框架。在Java API方面,Spark提供了丰富的类库和接口,使得开发人员能够利用Java语言方便地进行分布式计算。本资源是关于Spark 2.0.1 ...

Global site tag (gtag.js) - Google Analytics