`

16.RDD 实战

阅读更多

由于RDD的不可修改的特性,导致RDD的操作与正常面向对象的操作不同,RDD的操作基本分为3大类:transformation,action,contoller

1.   Transformation

Transformation是通过转化针对已有的RDD创建出新的RDD

map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

filter(func): 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

flatMap(func):和map差不多,但是flatMap生成的是多个结果

mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition

mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

sample(withReplacement,faction,seed):抽样

union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element

groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist

reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型

join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数

cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数

 

Transformation特性:

lazy优化:由于Tranformation的lazy特性,也就是创建不马上运行,对于框架来说,我有足够的时间查看到尽可能多的步骤,看到的步骤越多,优化的空间就越大。最简单的优化方式就是步骤合并,例如本来的做法是a=b*3;b=c*3;c=d*3;d=3,步骤合并后就是a=3*3*3*3。

2.   Action

Action操作的目的是得到一个值,或者一个结果

reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

count():返回的是dataset中的element的个数

first():返回的是dataset中的第一个元素

take(n):返回前n个elements,这个士driverprogram返回的

takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFile(path):把dataset写到一个textfile中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

countByKey():返回的是key对应的个数的一个map,作用于一个RDD

foreach(func):对dataset中的每个元素都使用func

3.   Contoller

Contoller动作主要为持久化RDD,例如cache(),persist(),checkpoint();

具体内容在后续刊物中会讲解。

 

 

 

4.   Spark WordCount动手实践

         本小节通过IDEA具体逐步调试一个WordCount案例,让学员知道各步骤中RDD的具体类型,并为下一节逐步解析做铺垫

(1)     使用的wordCount代码如下:

  1. object WordCount {
  2.   def main (args: Array[String]) {
  3.     val conf = new SparkConf()//create SparkConf
  4.     conf.setAppName("Wow,My First Spark App")//set app name
  5.     conf.setMaster("local")//run local
  6.     val sc =new SparkContext(conf)
  7.     val lines =sc.textFile("C://Users//feng//IdeaProjects//WordCount//src//SparkText.txt")
  8.     val words = lines.flatMap{ lines => lines.split(" ") }
  9.     val pairs =words.map ( word => (word,1) )
  10.     val reduce = pairs.reduceByKey(_+_)
  11.     val sort_1 = reduce.map(pair=>(pair._2,pair._1))
  12.     val sort_2 = sort_1.sortByKey(false)
  13.     val sort_3=sort_2.map(pair=>(pair._2,pair._1))
  14.     val filter=sort_3.filter(pair=>pair._2>2)
  15.     filter.collect.foreach(wordNumberPair => println(wordNumberPair._1+" : "+wordNumberPair._2))
  16.     sc.stop()
  17.   }
  18. }

(1)       程序使用的SparkText.txt文件内容如下

hadoop hadoop hadoop

spark Flink spark

scala scala object

object spark scala

spark spark

Hadoop hadoop

(2)       程序WordCount调试结果:

通过IDEA的逐步调试,会在调试窗口显示每一行代码具体操作什么类型的RDD,此RDD通过什么依赖关系依赖于父RDD等重要信息(如图2-14所示),程序运行结果如图2-15所示。

 

图2-14调试过程图

 

图2-15wordCount结果

2.8.2 解析RDD生成的内部机制

本小节基于上小节程序的调试结果,逐条查看调试信息内容,并基于信息内容进行讲解,并在讲解中回顾并复习本章所有内容。

(1)       line = sc.textFile()

本语句的作用在于从外部数据中读取数据,并生成MapPartitionsRDD。此处需要注意:

如图2-16所示,可以看出次MapPartitionsRDD的deps(dependency,依赖)为HadoopRDD,从这里可以发现其实textFile()过程包含两个步骤,第一步骤将文件内容转化为HadoopRDD(key-value形式,key为行号),第二步骤将HadoopRDD转化为MapPartitionsRDD(value形式,将key-value类型的key删去)

 

图2-16通过HadoopRDD获取数据

(2)       words=line.flatMap()

此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录进行以空格为标记的切分,并把每一个RDD的切分的结果放在一个MapPartitionRDD中

(3)       pairs=words.map(word=>(word,1))

此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录(例:spark(value类型))转换为key-value类型(例: (spark,1)),便于下一步reduceByKey操作

(4)       reduce = pairs.reduceByKey(_+_)

此命令对于RDD采取action(动作)操作,作用在于通过shuffle将pairs中所有的记录按照key相同value相加的规则进行处理,并把结果放到一个shuffleRDD中。例((spark,1),(spark,1))变成((spark,2))。

同时需要注意一下两点:首先本步骤实质上分为两个步骤,第一步骤为local级别的reduce,对当前计算机所拥有的数据先进行reduce操作,生成MapPartitionsRDD;第二步骤为shuffle级别的reduce,基于第一步骤的结果,对结果进行shuffle-reduce操作,生成最终的shuffleRDD。其次 Action操作进行时,对此操作之前的所有转换操作进行执行,所以调试过程中会出现此前的除textFile操作的执行时间均非常短,说明RDD转换操作不直接进行运算。

(5)       sort_1 = reduce.map(pair=>(pair._2,pair._1))

此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的key和value互换,生成一个新的MapPartitionsRDD。例: (spark,2)变为(2,spark)

(6)       sort_2 = sort_1.sortByKey(false)

此命令对于RDD采取action(动作)操作,作用在于将MapPartitionsRDD根据key进行排序,并生成shuffleRDD

(7)       sort_3=sort_2.map(pair=>(pair._2,pair._1))

此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的key和value互换,生成一个新的MapPartitionsRDD。例: (2,spark)变为(spark,2)

(8)       filter=sort_3.filter(pair=>pair._2>2)

此命令对于RDD采取transformation(转换)操作,作用在于根据value值筛选MapPartitionsRDD中的数据,输出value大于2的记录

(9)       最后通过collect()方法将结果收集后,使用foreach()方法遍历数据并通过println()方法打印出所有数据。

 

注:本内容原型来自 IMP 课程笔记

如果技术上有什么疑问,欢迎加我QQ交流: 1106373297 
分享到:
评论

相关推荐

    spark rdd 实战 ,基本语法

    Spark RDD 实战、基本语法 本文将对 Spark RDD 进行深入浅出的讲解,涵盖 Spark 的基本特性、生态体系、优势、支持的 API、运行模式、RDD 的概念和类型、容错 Lineage、缓存策略等知识点。 Spark 的基本特性 ...

    Spark分布式内存计算框架视频教程

    知识点介绍、代码演示、逻辑分析、灵活举例、使用图形的方式详细演示代码的流程和细节、整合企业级实战案例,全面讲解并突出重点,让学习也变成一种快乐。 课程亮点 1,知识体系完备,阶段学习者都能学有所获。 2,...

    spark商业实战三部曲

    章 电光石火间体验Spark 2.2开发实战... 2 1.1 通过RDD实战电影点评系统入门及源码阅读... 2 1.1.1 Spark核心概念图解... 2 1.1.2 通过RDD实战电影点评系统案例... 4 1.2 通过DataFrame和DataSet实战电影点评...

    spark2.x实战

    ### Spark2.x实战知识点梳理 #### 一、Spark概述 **1.1 Spark简介** - **定义**: Apache Spark 是一个开源的大规模数据处理框架。它提供了高性能的数据处理能力,并且支持多种编程语言如 Scala、Java 和 Python。...

    03_SparkRDD(RDD编程实战)

    Spark RDD(弹性分布式数据集)是Apache Spark的核心概念,它是一种可分区、容错并可以在集群上并行处理的数据集合。在本实例中,我们将学习如何使用Python接口PySpark来处理RDD,通过实现三个基本的运营案例:计算...

    PySpark_Day04:RDD Operations & Shared Variables.pdf

    PySpark_Day04:RDD Operations & Shared Variables 主要讲解了 RDD 算子、RDD 共享变量、综合实战案例及 Spark 内核调度。 知识点1:RDD 概念 RDD(Resilient Distributed Dataset)是 Spark Core 中的核心概念。...

    10 实战解析spark运行原理和RDD解密

    "Spark 运行原理和 RDD 解密" Spark 是一个分布式计算框架,基于内存和磁盘,特别适合于迭代计算。Spark 的运行原理可以分为两大部分:Driver 端和 Executor 端。Driver 端负责提交任务,Executor 端负责执行任务...

    电影评分数据汇总(使用spark2.4+scala, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    3、不仅适合小白学习实战练习,也可作为大作业、课程设计、毕设项目、初期项目立项演示等,欢迎下载,互相学习,共同进步! 电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准...

    大数据处理中PySpark操作与实战案例:RDD创建及基本操作教程

    内容概要:本文档主要介绍如何利用 PySpark 对 RDD (弹性分布式数据集)进行一系列基本操作。涵盖了创建 RDD 不同的方式如使用列表并行化创建、读取本地和 HDFS 中文件来创建以及对于 RDD 各种常用的数据转换和动作...

    Python大数据处理库 PySpark实战-源代码.rar

    - RDD(弹性分布式数据集):Spark的核心数据结构,是存储在集群中的不可变、分区的数据集合。 - DataFrame和Dataset:Spark 2.0引入的新数据抽象,提供了更高级别的API,使得数据处理更加面向对象和类型安全。 2...

    Learning.Spark.pdf(英文版)+图解Spark核心技术与案例实战.pdf

    《Learning Spark》与《图解Spark核心技术与案例实战》两本书是深入了解和学习Apache Spark的重要资源,它们分别从英文和中文角度提供了丰富的Spark知识。Spark作为一个分布式计算框架,以其高效、易用和多模态处理...

    Spark机器学习案例实战.pdf

    Dataset API结合了RDD(弹性分布式数据集)的类型安全优势以及DataFrame的易用性和优化性能。 5. Spark环境配置和spark-shell 在进行Spark编程之前,通常需要对Spark环境进行配置,比如设置executor-memory(执行器...

    spark大数据商业实战三部曲源码及资料.zip

    - RDD(弹性分布式数据集)的创建、转换和行动操作的实现细节。 四、实战应用 1. 数据分析:通过Spark SQL进行ETL(提取、转换、加载)操作,对海量数据进行清洗和预处理。 2. 实时流处理:利用Spark Streaming处理...

    Spark大数据分析与实战课后练习答案.rar

    《Spark大数据分析与实战》课程是一门深入探讨Apache Spark在大数据处理领域的应用和技术的课程,其课后练习答案集提供了对课程所讲授知识的巩固和实践。这是一份珍贵的配套教学资源,旨在帮助学生更好地理解和掌握...

    Python大数据分析&人工智能教程 - Spark-RDD案例分析和实战(含案例源码和学习思维导图)

    词频案例分析 wc 场景1. 统计单个文件 1) input 1/n个文件 ... 把每个文件传入到textFile, 得到rdd 重复如下方法: 文本内容的每一行转成一个个单词:flatMap 算子 一维 单词 ===》(单词, 1): map 算子 二维

    Spark实战.docx

    RDD是Spark的核心数据结构,它是不可变的、分区的数据集,可以由集合转换、文件系统或父RDD转换生成。RDD的操作主要有Transformation和Action两类,Transformation是延迟执行的,只有在触发Action时,才会实际执行...

    Spark实战高手之路.rar

    《Spark实战高手之路》这本书是针对大数据处理领域中Apache Spark技术的一本深度实践指南。Spark作为当前大数据处理领域的热门框架,以其高效、易用和强大的并行计算能力赢得了广大开发者和数据科学家的喜爱。本书...

    Python大数据处理库 PySpark实战

    **Python大数据处理库 PySpark 实战** 在大数据领域,PySpark是Python编程语言与Apache Spark框架相结合的重要工具,它提供了Python API,使得开发者能够利用Spark的强大功能进行数据处理。PySpark广泛应用于数据...

Global site tag (gtag.js) - Google Analytics