`

Spark基础-RDD编程

 
阅读更多

一、创建RDD

Spark提供了两种创建RDD的方式,读取外部数据集,以及在驱动器程序中对一个集合进行并行化。

1、读取外部数据:

val lines = sc.textFile("/path/to/README.md")

2、对集合进行并行化

val lines = sc.parallelize(List("pandas", "i like pandas"))

二、RDD操作

RDD支持两种操作:转化操作行动操作。RDD的转化操作是返回一个新的RDD的操作,比如map()和filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count()和first()。

如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是RDD,而行动操作返回的是其他的数据类型。

2.1、转化操作

RDD的转化操作是返回新的RDD的操作,转化出来的RDD是惰性求值的,只有在行动操作中用到这些RDD时才会被计算。

2.2、行动操作

行动操作是第二种类型的RDD操作,它们会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的RDD的转化操作。

2.3、惰性求值

上面提过,RDD的转化操作都是惰性求值的。这意味着在被调用行动操作之前Spark不会开始计算。惰性求值意味着当我们对RDD调用转化操作(例如调用map())时,操作不会立即执行。相反,Spark会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD看作存放着特定数据的数据集,而最好把每个RDD当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。把数据读取到RDD的操作同样也是惰性的。

三、常见的转化操作和行动操作

3.1、基本RDD

3.1.1、针对各个元素的转化操作

你很可能会用到的两个最常用的转化操作是map()和filter()。

1)map()

转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值。

让我们看一个简单的例子,用map()对RDD中的所有数求平方。

val input = sc.parallelize(List(1,2,3,4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

2)flatMap()

有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作flatMap()。和map()类似,我们提供给flatMap()的函数被分别应用到了输入RDD的每个元素上。

3)filter()

转化操作filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first()

flatMap()和map()的区别:

你可以把flatMap()看作将返回的迭代器“拍扁”,这样就得到了一个由各列表中的元素组成的RDD,而不是一个由列表组成的RDD。

3.1.2、伪集合操作

尽管RDD本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作。注意:这些操作都要求操作的RDD是相同数据类型的。

4)distinct()

我们的RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只要唯一的元素,我们可以使用RDD.distinct()转化操作来生成一个只包含不同元素的新RDD。不过需要注意,distinct()操作的开销很大,因为它需要将所有数据通过网络进行混洗,以确保每个元素都只有一份。

val RDD1 = sc.parallelize(Array("coffee","coffee","panda","monkey","tea"))
val OUTPUT = RDD1.distinct()
// 结果为"coffee","panda","monkey","tea"

 5)union()

最简单的集合操作是union(other),它会返回一个包含两个RDD中所有元素的RDD。这在很多用例下都很有用,比如处理来自多个数据的日志文件。

val RDD1 = sc.parallelize(Array("coffee","coffee","panda","monkey","tea"))
val RDD2 = sc.parallelize(Array("coffee","monkey","kitty"))
val OUTPUT = RDD1.union(DDR2)
// 结果为"coffee","panda","monkey","tea","coffee","monkey","kitty"

 6)intersection()

Spark还提供了insertsection(other)方法,只返回两个RDD中都有的元素。insertsection()在运行时也会去掉所有重复的元素(单个RDD内的重复元素也会一起移除)。因为它需要通过网络混洗数据来发现共有的元素,所以它的性能比较差。

val RDD1 = sc.parallelize(Array("coffee","coffee","panda","monkey","tea"))
val RDD2 = sc.parallelize(Array("coffee","monkey","kitty"))
val OUTPUT = RDD1.intersection(DDR2)
// 结果为"coffee","monkey"

 7)subtract()

有时我们需要移除一些数据。subtract(other)函数接收另一个RDD作为参数,返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。和intersection()一样,它也需要数据混洗。

val RDD1 = sc.parallelize(Array("coffee","coffee","panda","monkey","tea"))
val RDD2 = sc.parallelize(Array("coffee","monkey","kitty"))
val OUTPUT = RDD1.subtract(DDR2)
// 结果为"panda","tea"

 8)cartesian()

我们也可以计算两个RDD的笛卡尔积。cartesian(other)转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b则来自另一个RDD。

3.1.3、行动操作

1)reduce()

它接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD进行累加。

val sum = rdd.reduce((x,y)=>x+y)

 再看一个例子,给定一个RDD[Int]求平均数:

// 创建一个RDD
val rdd1 = sc.parallelize(List(1,5,2,7,3,1,3,9,10))
// 使用map()将rdd1转化为二元组
val rdd2 = rdd1.map(x => (x, 1))
// 使用reduce()计算总和和总计数
val rdd3 = rdd2.reduce((x, y) => (x._1 + y._1, x._2 + y._2))
// 求平均值
val rdd4 = rdd3._1 / rdd3._2.toDouble

 

分享到:
评论

相关推荐

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    - RDD(弹性分布式数据集):Spark的基础数据结构,是不可变、分区的数据集合,可以在集群中并行操作。 - DataFrame:Spark SQL引入的数据模型,它是基于表和列的抽象,提供了更高级别的抽象和优化。 - Dataset:...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...

    spark-assembly-1.5.2-hadoop2.6.0.jar

    《Spark编程核心组件:spark-assembly-1.5.2-hadoop2.6.0.jar详解》 在大数据处理领域,Spark以其高效、易用和灵活性脱颖而出,成为了许多开发者的首选框架。Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark中的一个...

    spark实验5 rdd编程2.doc

    ### Spark 实验报告:RDD 编程应用 #### 实验目的与背景 本实验旨在通过具体的数据处理任务,深入理解Apache Spark中Resilient Distributed Datasets (RDD) 的使用方法及其在解决实际问题中的作用。实验选取了一所...

    spark-1.6.0-bin-hadoop2.6.tgz

    Spark支持多种编程语言,包括Scala、Java、Python和R。开发者可以根据需求选择合适的语言编写应用程序,然后使用`spark-submit`脚本来提交任务到集群。 **6. 性能调优** Spark性能优化主要包括内存管理、任务调度...

    spark-3.1.2-bin-hadoop3.2.tgz

    2. **Resilient Distributed Datasets (RDD)**:RDD是Spark的基本数据结构,它是不可变、分区的元素集合,能够在集群中的节点上分布式存储。 3. **弹性**:Spark提供了容错机制,当工作节点失败时,可以自动恢复丢失...

    spark-2.4.7-bin-hadoop2.6.tgz

    3. **多模式编程**:Spark支持多种编程语言,包括Java、Scala、Python和R,这使得开发人员可以根据自己的喜好选择合适的工具。 4. **Spark SQL**:Spark SQL是Spark的一个模块,用于处理结构化数据,它集成了SQL...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    1. **Spark**: Spark的核心在于它的弹性分布式数据集(RDD),这是一个容错的内存计算模型。它提供了一组高级APIs,支持批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)和机器学习(MLlib)等多种...

    spark-2.1.1-bin-hadoop2.7.tgz.7z

    Scala 2.11是这个Spark版本所使用的编程语言基础,它是一种静态类型的函数式编程语言,适合构建大规模并行和分布式应用。Spark的API主要由Scala编写,这使得Spark可以利用Scala的强类型和模式匹配功能,提供简洁、...

    spark-2.3.0-bin-hadoop2.7版本.zip

    Hadoop2.7是Hadoop生态系统中的一个版本,它为Spark提供了分布式存储的基础,即HDFS(Hadoop Distributed File System)。Spark与Hadoop的集成,使得Spark可以无缝地读取和写入Hadoop的数据,进一步增强了其在大数据...

    spark2.3.0-hadoop2.6.tgz

    Scala是Spark的基础编程语言,它的版本需要与Spark兼容。Scala提供了一种强大的面向对象和函数式编程模型,使得编写分布式计算程序变得更为便捷。 Hadoop 2.6是Apache Hadoop的一个版本,它是大数据存储和处理的...

    03_SparkRDD(RDD编程实战)

    Spark RDD(弹性分布式数据集)...它们是Spark进行大数据分析的基础,通过这些操作可以构建复杂的分布式计算任务。在实际工作中,可以根据需要处理的数据类型和业务需求,灵活组合这些操作,实现高效的数据处理和分析。

    spark-2.4.6-bin-2.6.0-cdh5.7.0.tgz

    Spark 2.4.6包含了多个组件,包括Spark SQL用于结构化数据处理,Spark Streaming用于实时流处理,MLlib用于机器学习,GraphX用于图计算,以及Spark Core作为基础框架。Spark SQL提供了SQL接口,使得用户可以通过SQL...

    spark-2.0.0-bin-hadoop2.7.tgz.zip

    Spark的核心在于它的弹性分布式数据集(Resilient Distributed Datasets,简称RDD),这是一种可以在集群中并行操作的数据结构。RDD具有容错性,一旦创建,就能在节点故障时自动恢复。Spark 2.0.0在此基础上引入了...

    spark-2.4.0-bin-without-hadoop.tgz

    Spark Core 是 Spark 的基础,负责任务调度、内存管理、网络通信和存储接口。在 2.4.0 版本中,核心模块进行了性能调优,包括更快的 shuffle 操作和更好的资源调度。 GraphX 是 Spark 处理图形数据的模块,它提供了...

    Python大数据分析&人工智能教程 - Spark-SQL编程实例(含源码和学习思维导图)

    Spark-SQL概述、特点、组成、Spark-SQL语句模块解析、Spark-SQL架构、Dataframe API编程等内容 本资源包为您提供了一个全面的Python大数据分析与人工智能教程,特别关注Apache Spark中的Spark-SQL模块。Spark-SQL是...

    spark-2.4.0-bin-hadoop2.6.tgz

    1. **Spark核心概念**:Spark的核心组件是弹性分布式数据集(Resilient Distributed Datasets, RDD),这是一种容错的、不可变的数据集合,可以在集群中的多个节点上并行操作。此外,Spark还提供了DataFrame和...

    RDD编程初级实践数据集

    通过这个“RDD编程初级实践数据集”,初学者可以动手操作,学习如何在Spark中创建、转换和操作RDD,以及理解其容错机制和性能优化策略。实践中遇到的问题和解决方案将有助于深入理解Spark的工作原理和最佳实践。

    spark-2.1.0-bin-without-hadoop.tgz

    9. **Spark编程模型**:Spark的编程模型包括Scala、Java、Python和R,提供了丰富的API,使得开发者可以根据喜好和项目需求选择合适的语言。 10. **案例应用**:Spark广泛应用于推荐系统、实时分析、日志分析、图...

Global site tag (gtag.js) - Google Analytics