第15课:RDD创建内幕
1. RDD的创建方式
Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法
Spark中的基本方式:
1) 使用程序中的集合创建
这种方式的实际意义主要用于测试。
2) 使用本地文件系统创建
这种方式的实际意义主要用于测试大量数据的文件
3) 使用HDFS创建RDD
这种方式为生产环境中最常用的创建RDD的方式
4) 基于DB创建
5) 基于NoSQL:例如HBase
6) 基于S3(SC3)创建
7) 基于数据流创建
2. RDD创建实战
1) 通过集合创建
代码:
object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val numbers = 1 to 100 //创建一个Scala集合
val rdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_) //1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
}
}
结果:
2) 通过本地文件系统创建
代码:
object RDDBasedOnLocalFile { def main (args: Array[String]) { val conf = new SparkConf()//create SparkConf conf.setAppName("RDDBasedOnCollection")//set app name conf.setMaster("local")//run local val sc =new SparkContext(conf) val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt") val linesLength=rdd.map(line=>line.length()) val sum = linesLength.reduce(_+_) println("the total characters of the file"+"="+sum) } }
结果:
3) 通过HDFS创建RDD
代码:
val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)
结果:
关于spark并行度:
1.默认并行度为程序分配到的cpu core的数目
2.可以手动设置并行度,并行度最佳实践
1. 2-4 partitions for each CPU core
2.综合考虑cpu和内存
注:本内容原型来自 IMP 课程笔记
相关推荐
1. 创建RDD:这通常是通过读取外部数据集或在Driver程序中分布对象集合(如列表或集合)来完成。 2. 转换操作(Transformation):这类操作会生成新的RDD,但并不会立即执行计算。例如,`filter()`、`map()`、`...
1. 从外部数据创建出输入RDD; 2. 使用诸如filter()等这样的转化操作对RDD进行转化,以定义新的RDD; 3. 告诉Spark对需要被重用的中间结果RDD执行persist()操作; 4. 使用诸如first()等这样的行动操作来触发一次并行...
这些测试通常会包括创建测试数据,调用自定义RDD的函数,然后检查结果是否符合预期。测试对于确保代码的正确性和性能至关重要,特别是在分布式环境下,它们可以帮助发现潜在的问题并优化性能。 自定义RDD从HDFS读取...
根据提供的文档内容,我们可以深入探讨有关Resilient Distributed Datasets(简称RDD)的相关知识点,特别是其创建方式与转换操作。 ### 1. RDD 创建 #### 1.1 从列表创建 RDD 在Spark中,可以通过从一个Python...
但在实际的RDD编程中,我们还需要了解如何创建、转换和行动RDD。创建RDD通常通过`parallelize()`函数,将已有的数据集转化为分布式数据集。转换操作(transformations)如`map()`、`filter()`和`flatMap()`用于对RDD...
- **转换操作**(Transformation):如`map`、`filter`、`flatMap`、`reduceByKey`等,它们创建新的RDD而不立即执行任何计算,这些操作是懒惰的。 - **行动操作**(Action):如`count`、`collect`、`...
文档详细解释了如何通过Hadoop文件系统(或其他支持的文件系统)中的文件,或者通过驱动程序中的Scala集合来创建RDD。用户也可以请求Spark将RDD持久化在内存中,这样就可以跨多个并行操作高效重用。此外,文档还介绍...
你可以通过两种方式创建RDD:从静态文件或者通过现有RDD进行转换。RDD支持两种主要的操作类型:转换(transformations)和动作(actions)。转换创建新的RDD,而动作触发计算并可能返回结果到驱动程序。 例如,`RDD...
2.RDD 创建 3.RDD 函数及使用 4.RDD 持久化 5.案例:SogouQ日志分析 6.RDD Checkpoint 7.外部数据源(HBase和MySQL) 8.广播变量和累加器 9.Spark 内核调度 10.Spark 并行度 第三章、SparkSQL 模块 1.快速入门:词频...
2. **从文件系统中创建**:如 `sc.textFile`,可以从 HDFS 或本地文件系统读取数据创建 RDD。 转换操作的例子包括: - `map(func)`:对每个元素应用函数 `func`,生成新 RDD。 - `filter(pred)`:根据谓词 `pred` ...
- **通过数据集创建**:你可以从现有数据源(如HDFS、本地文件系统或内存)创建RDD。 - **通过SparkContext操作**:使用`SparkContext`的`textFile()`方法读取文本文件创建RDD。 3. **RDD操作** - **转换操作...
血统记录了RDD创建的操作历史,当数据丢失时,可以通过这些历史操作重新计算丢失的部分。 2. **分布式**:RDD是分布在集群中的数据集,可以被分成多个分区,并且可以在多台机器上并行处理。这种分布式特性使得Spark...
1. 从外部数据源创建:可以从外部数据源,例如文本文件、数据库等,创建 RDD。 2. 从现有 RDD 转换:可以从现有 RDD 进行转换,例如 filter、map、reduce 等操作。 RDD 的操作 RDD 支持两种类型的操作: 1. 转换...
1. Transformations:RDD的transformation操作是对数据集的转换,用于创建一个新的RDD,输入与输出类型可以不同。 - map(f:T=>U): 对RDD中的每个元素应用一个函数f,返回一个新的RDD。 - flatMap(f:T=>Iterable[U]...
Transformation操作用于创建新的RDD,并且这些操作是懒加载执行的,也就是说它们并不会立即被执行,而是等到Action操作触发时才会真正运行。以下是一些常用的Transformation操作: 1. **`map(func)`** - 描述:对...
- `sc = SparkContext(master = "local[2]")`:创建SparkContext实例,`master`参数指定运行模式,`local[2]`表示在本地模式下运行,使用2个线程。 2. **获取RDD信息**: - `rdd.getNumPartitions()`:返回RDD的...
转换操作创建新的RDD而不立即执行,只有当行动操作被调用时,Spark才会执行转换操作,并将结果返回给驱动程序或者写入到外部存储。 - 转换操作包括:map()、filter()、flatMap()、distinct()、reduceByKey()、...