第15课:RDD创建内幕
1. RDD的创建方式
Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法
Spark中的基本方式:
1) 使用程序中的集合创建
这种方式的实际意义主要用于测试。
2) 使用本地文件系统创建
这种方式的实际意义主要用于测试大量数据的文件
3) 使用HDFS创建RDD
这种方式为生产环境中最常用的创建RDD的方式
4) 基于DB创建
5) 基于NoSQL:例如HBase
6) 基于S3(SC3)创建
7) 基于数据流创建
2. RDD创建实战
1) 通过集合创建
代码:
object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val numbers = 1 to 100 //创建一个Scala集合
val rdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_) //1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
}
}
结果:
2) 通过本地文件系统创建
代码:
object RDDBasedOnLocalFile { def main (args: Array[String]) { val conf = new SparkConf()//create SparkConf conf.setAppName("RDDBasedOnCollection")//set app name conf.setMaster("local")//run local val sc =new SparkContext(conf) val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt") val linesLength=rdd.map(line=>line.length()) val sum = linesLength.reduce(_+_) println("the total characters of the file"+"="+sum) } }
结果:
3) 通过HDFS创建RDD
代码:
val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)
结果:
关于spark并行度:
1.默认并行度为程序分配到的cpu core的数目
2.可以手动设置并行度,并行度最佳实践
1. 2-4 partitions for each CPU core
2.综合考虑cpu和内存
注:本内容原型来自 IMP 课程笔记
相关推荐
1. 创建RDD:这通常是通过读取外部数据集或在Driver程序中分布对象集合(如列表或集合)来完成。 2. 转换操作(Transformation):这类操作会生成新的RDD,但并不会立即执行计算。例如,`filter()`、`map()`、`...
除了上述两种常见方式外,还可以通过读取数据库等方式创建 RDD,甚至通过其他 RDD 转换而来。例如,从 MySQL 数据库中读取数据,可以先将数据导出为文件形式,再通过 `textFile` 方法读取。 #### 三、RDD 编程 API ...
1. 从外部数据创建出输入RDD; 2. 使用诸如filter()等这样的转化操作对RDD进行转化,以定义新的RDD; 3. 告诉Spark对需要被重用的中间结果RDD执行persist()操作; 4. 使用诸如first()等这样的行动操作来触发一次并行...
这些测试通常会包括创建测试数据,调用自定义RDD的函数,然后检查结果是否符合预期。测试对于确保代码的正确性和性能至关重要,特别是在分布式环境下,它们可以帮助发现潜在的问题并优化性能。 自定义RDD从HDFS读取...
根据提供的文档内容,我们可以深入探讨有关Resilient Distributed Datasets(简称RDD)的相关知识点,特别是其创建方式与转换操作。 ### 1. RDD 创建 #### 1.1 从列表创建 RDD 在Spark中,可以通过从一个Python...
在创建RDD时,可以使用两种主要的方式来初始化数据:一种是从本地集合元素中创建,另一种是从外部数据源(如文本文件)中创建。无论哪种方式,创建RDD后,用户可以对数据集执行一系列的转换操作。 转换操作包括但不...
但在实际的RDD编程中,我们还需要了解如何创建、转换和行动RDD。创建RDD通常通过`parallelize()`函数,将已有的数据集转化为分布式数据集。转换操作(transformations)如`map()`、`filter()`和`flatMap()`用于对RDD...
- **转换操作**(Transformation):如`map`、`filter`、`flatMap`、`reduceByKey`等,它们创建新的RDD而不立即执行任何计算,这些操作是懒惰的。 - **行动操作**(Action):如`count`、`collect`、`...
文档详细解释了如何通过Hadoop文件系统(或其他支持的文件系统)中的文件,或者通过驱动程序中的Scala集合来创建RDD。用户也可以请求Spark将RDD持久化在内存中,这样就可以跨多个并行操作高效重用。此外,文档还介绍...
1. **创建SQLContext**:在Scala、Java或Python环境中,你需要先创建一个SQLContext实例。例如,在Scala中,你可以使用以下代码: ```scala import org.apache.spark.sql.SparkSession val spark = SparkSession...
你可以通过两种方式创建RDD:从静态文件或者通过现有RDD进行转换。RDD支持两种主要的操作类型:转换(transformations)和动作(actions)。转换创建新的RDD,而动作触发计算并可能返回结果到驱动程序。 例如,`RDD...
2.RDD 创建 3.RDD 函数及使用 4.RDD 持久化 5.案例:SogouQ日志分析 6.RDD Checkpoint 7.外部数据源(HBase和MySQL) 8.广播变量和累加器 9.Spark 内核调度 10.Spark 并行度 第三章、SparkSQL 模块 1.快速入门:词频...
2. **从文件系统中创建**:如 `sc.textFile`,可以从 HDFS 或本地文件系统读取数据创建 RDD。 转换操作的例子包括: - `map(func)`:对每个元素应用函数 `func`,生成新 RDD。 - `filter(pred)`:根据谓词 `pred` ...
- **通过数据集创建**:你可以从现有数据源(如HDFS、本地文件系统或内存)创建RDD。 - **通过SparkContext操作**:使用`SparkContext`的`textFile()`方法读取文本文件创建RDD。 3. **RDD操作** - **转换操作...
血统记录了RDD创建的操作历史,当数据丢失时,可以通过这些历史操作重新计算丢失的部分。 2. **分布式**:RDD是分布在集群中的数据集,可以被分成多个分区,并且可以在多台机器上并行处理。这种分布式特性使得Spark...
涵盖了创建 RDD 不同的方式如使用列表并行化创建、读取本地和 HDFS 中文件来创建以及对于 RDD 各种常用的数据转换和动作运算,例如map映射、flatmap扁平化处理、filter过滤、take提取前N条记录等等操作方法的演示和...
Spark提供了多种API函数来操作RDD,例如`textFile()`用于从文件系统中读取数据并创建RDD,`map()`用于对每个元素应用函数,`filter()`用于筛选满足条件的元素,`reduceByKey()`用于对键值对RDD进行聚合操作等。...
1. 从外部数据源创建:可以从外部数据源,例如文本文件、数据库等,创建 RDD。 2. 从现有 RDD 转换:可以从现有 RDD 进行转换,例如 filter、map、reduce 等操作。 RDD 的操作 RDD 支持两种类型的操作: 1. 转换...
1. Transformations:RDD的transformation操作是对数据集的转换,用于创建一个新的RDD,输入与输出类型可以不同。 - map(f:T=>U): 对RDD中的每个元素应用一个函数f,返回一个新的RDD。 - flatMap(f:T=>Iterable[U]...