`
WindyQin
  • 浏览: 32371 次
  • 性别: Icon_minigender_1
  • 来自: 郑州
社区版块
存档分类
最新评论

Spark的这些事<三>——spark常用的Transformations 和Actions

 
阅读更多

Transformations

map,filter
spark最长用的两个Transformations:map,filter,下面就来介绍一下这两个。

先看下面这张图:

这里写图片描述

从上图中可以清洗的看到 map和filter都是做的什么工作,那我们就代码演示一下。

    val input = sc.parallelize(List(1,2,3,4))

    val result1 = input.map(x=>x*x)
    val result2 = input.filter(x=>x!=1)

    print(result1.collect().mkString(","))
    print("\n")
    print(result2.collect().mkString(","))
    print("\n")

执行结果如下:

16/08/17 18:48:31 INFO DAGScheduler: ResultStage 0 (collect at Map.scala:17) finished in 0.093 s
16/08/17 18:48:31 INFO DAGScheduler: Job 0 finished: collect at Map.scala:17, took 0.268871 s
1,4,9,16
........
16/08/17 18:48:31 INFO DAGScheduler: ResultStage 1 (collect at Map.scala:19) finished in 0.000 s
16/08/17 18:48:31 INFO DAGScheduler: Job 1 finished: collect at Map.scala:19, took 0.018291 s
2,3,4

再回头看下上面那张图,是不是明白什么意思了!

flatMap
另外一个常用的就是flatMap,输入一串字符,分割出每个字符

map和flatmap的区别

来用代码实践一下:

    val lines = sc.parallelize(List("hello world","hi"))
    val words = lines.flatMap (lines=>lines.split(" "))
    print(words.first())
    print("\n")

执行结果:

16/08/17 19:23:24 INFO DAGScheduler: Job 2 finished: first at Map.scala:24, took 0.016987 s
hello
16/08/17 19:23:24 INFO SparkContext: Invoking stop() from shutdown hook

分隔符如果改一下的话:

val words = lines.flatMap (lines=>lines.split(","))

结果会怎样呢?

16/08/17 19:33:14 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
hello world
16/08/17 19:33:14 INFO SparkContext: Invoking stop() from shutdown hook

和想象的一样吧~

distinct,distinct,intersection,subtract
还有几个比较常用的:distinct,distinct,intersection,subtract

这里写图片描述

来看看代码实践:

val rdd1 = sc.parallelize(List("coffee","coffee","panda","monkey","tea"))
    val rdd2 = sc.parallelize(List("coffee","monkey","kitty"))

    rdd1.distinct().take(100).foreach(println)

结果:

16/08/17 19:52:29 INFO DAGScheduler: ResultStage 4 (take at Map.scala:30) finished in 0.047 s
16/08/17 19:52:29 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
16/08/17 19:52:29 INFO DAGScheduler: Job 3 finished: take at Map.scala:30, took 0.152405 s
monkey
coffee
panda
tea
16/08/17 19:52:29 INFO SparkContext: Starting job: take at Map.scala:32

代码:

 rdd1.union(rdd2).take(100).foreach(println)

结果:

6/08/17 19:52:29 INFO DAGScheduler: Job 5 finished: take at Map.scala:32, took 0.011825 s
coffee
coffee
panda
monkey
tea
coffee
monkey
kitty
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:34
16/08/17 19:52:30 INFO DAGScheduler: Registering RDD 11 (intersection at Map.scala:34)
16/08/17 19:52:30 INFO DAGScheduler: Registering RDD 12 (intersection at Map.scala:34)

代码:

rdd1.intersection(rdd2).take(100).foreach(println)

结果:

16/08/17 19:52:30 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 9) in 31 ms on localhost (1/1)
16/08/17 19:52:30 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 
16/08/17 19:52:30 INFO DAGScheduler: ResultStage 9 (take at Map.scala:34) finished in 0.031 s
16/08/17 19:52:30 INFO DAGScheduler: Job 6 finished: take at Map.scala:34, took 0.060785 s
monkey
coffee
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:36

代码:

rdd1.subtract(rdd2).take(100).foreach(println)

结果:

16/08/17 19:52:30 INFO DAGScheduler: Job 6 finished: take at Map.scala:34, took 0.060785 s
monkey
coffee
16/08/17 19:52:30 INFO SparkContext: Starting job: take at Map.scala:36

再看看上面的图,很容易理解吧

Actions

常用的Transformations就介绍到这里,下面介绍下常用的Action:
reduce,countByValue,takeOrdered,takeSample,aggregate

首先看一下:reduce

    val rdd5 = sc.parallelize(List(1,2,3,4))
    print("reduce action:"+rdd5.reduce((x,y)=>x+y)+"\n")
16/08/18 11:51:16 INFO DAGScheduler: Job 15 finished: reduce at Function.scala:55, took 0.012698 s
reduce action:10
16/08/18 11:51:16 INFO SparkContext: Starting job: aggregate at Function.scala:57

countByValue

print(rdd1.countByValue() + "\n")
16/08/18 11:51:16 INFO DAGScheduler: Job 11 finished: countByValue at Function.scala:48, took 0.031726 s
Map(monkey -> 1, coffee -> 2, panda -> 1, tea -> 1)
16/08/18 11:51:16 INFO SparkContext: Starting job: takeOrdered at Function.scala:50

takeOrdered

rdd1.takeOrdered(10).take(100).foreach(println)
16/08/18 11:51:16 INFO DAGScheduler: Job 12 finished: takeOrdered at Function.scala:50, took 0.026160 s
coffee
coffee
monkey
panda
tea
16/08/18 11:51:16 INFO SparkContext: Starting job: takeSample at Function.scala:52

aggregate
这个要重点介绍一下:

Spark文档中aggregate函数定义如下
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue. seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

val rdd5 = sc.parallelize(List(1,2,3,4))
val rdd6 = rdd5.aggregate((0, 0))  ((x, y) =>(x._1 + y, x._2+1),  (x, y) =>(x._1 + y._1, x._2 + y._2))
    print ("aggregate action : " + rdd6 + "\n"  )

我们看一下结果:

16/08/18 11:51:16 INFO DAGScheduler: Job 16 finished: aggregate at Function.scala:57, took 0.011686 s
aggregate action : (10,4)
16/08/18 11:51:16 INFO SparkContext: Invoking stop() from shutdown hook

我们可以根据以上执行的例子来理解aggregate 用法:

  • 第一步:将rdd5中的元素与初始值遍历进行聚合操作
    • 第二步:将初始值加1进行遍历聚合
    • 第三步:将结果进行聚合
    • 根据本次的RDD 背部实现如下:
    • 第一步:其实是0+1
    • 1+2
    • 3+3
    • 6+4
    • 然后执行:0+1
    • 1+1
    • 2+1
    • 3+1
    • 此时返回(10,4)
    • 本次执行是一个节点,如果在集群中的话,多个节点,会先把数据打到不同的分区上,比如(1,2) (3,4)
    • 得到的结果就会是(3,2) (7,2)
    • 然后进行第二步combine就得到 (10,4)

这样你应该能理解aggregate这个函数了吧

以上就是对常用的Transformations 和Actions介绍,对于初学者来说,动手代码实践各个函数,才是明白其功能最好的方法。

PS :源码

Spark的这些事系列文章:
Spark的这些事<一>——Windows下spark开发环境搭建
Spark的这些事<二>——几个概念
Spark的这些事<三>——spark常用的Transformations 和Actions

<script type="text/javascript"> $(function () { $('pre.prettyprint code').each(function () { var lines = $(this).text().split('\n').length; var $numbering = $('<ul/>').addClass('pre-numbering').hide(); $(this).addClass('has-numbering').parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($('<li/>').text(i)); }; $numbering.fadeIn(1700); }); }); </script>

分享到:
评论

相关推荐

    Spark入门(Python).pdf

    3. **弹性分布式数据集(RDD)**:RDD是Spark的核心抽象,是一种容错的、只读的数据集合,可以通过转换操作(transformations)和动作操作(actions)进行处理。 4. **多工作负载支持**:Spark支持SQL查询(通过...

    Spark编程指南中文版

    《Spark编程指南中文版》是针对大数据处理领域的重要参考...这本书的三种格式——epub、mobi和pdf,适应不同的阅读设备和偏好。无论是初学者还是经验丰富的Spark开发者,都可以从中受益,掌握大数据处理的关键技能。

    大数据--Apache Spark编程详解

    二是支持两种类型的操作——转换(Transformations)和动作(Actions),这些操作使得开发者能够灵活地对数据进行处理。 ##### 2. 创建RDD - **从本地集合创建**:可以直接从Python列表等本地集合创建RDD。 - **通过...

    spark全案例

    3. RDD(弹性分布式数据集)操作:RDD是Spark的基本数据抽象,包括转换(transformations)和动作(actions)两种操作。 三、数据读写 1. 读取数据:可以使用Java API从多种数据源(如HDFS、Cassandra、HBase等)...

    Patrick Wendell:Spark Performance

    他首先介绍了Spark的运行机制,包括如何创建RDD(弹性分布式数据集), 以及如何对这些RDD进行转换(transformations)和执行动作(actions)。这里强调了RDD作为Spark中的基本并行集合,它们可以被分割成多个分区...

    spark培训ppt

    - **编程模型**:提供了丰富的 API,包括转换(transformations)和行动(actions)两种类型的操作,支持高效的并行数据处理。 - **运行模式**:支持多种部署模式,如 Local(适合开发和测试)、Standalone(自管理...

    从小火苗到燃烧的火车:Spark大数据处理秘籍.zip

    spark本文将为您介绍一款强大的大数据处理工具——Spark。它以独特的火苗形象,为大数据处理提供了快速、通用和可扩展的能力。本文将围绕Spark的基本概念、使用方法和功能进行讲解,帮助您快速上手Spark。 知识领域...

    02_SparkCore.docx

    2. **转换操作**:Spark提供丰富的转换操作(transformations),如map、filter、join等,这些操作建立RDD间的血缘关系。 3. **行动操作**:行动操作(actions)如count、collect等,触发计算并返回结果,或保存数据...

    2016012743_王宇轩_大数据实习二.zip

    例如,通过`SparkSession`创建Spark上下文,定义RDD,使用`transformations`定义数据处理流程,最后通过`actions`触发计算。在实践中,需要优化数据处理流程,避免shuffle操作过多,减少网络传输和磁盘I/O。 在...

    spark-scala-tutorial:Apache Spark的免费教程

    本教程“spark-scala-tutorial-master”将涵盖这些基础以及更多进阶主题,例如 Spark SQL、Spark Streaming、MLlib(机器学习库)和 GraphX。通过实践,你将掌握如何在 Scala 中有效地使用 Spark 进行大数据处理,为...

    sparkspall:spark项目实践

    同时,理解并利用RDD的transformations和actions,减少不必要的数据转换。 3.3 内存管理和持久化 通过调整Spark的内存分配策略,以及合理使用RDD的持久化级别,可以提高内存利用率和程序运行效率。 四、错误调试...

    LearningPySpark_Code.zip

    2. DataFrame转换与行动:DataFrame有两种操作,转换(Transformations)和行动(Actions)。转换会创建一个新的DataFrame,而不会立即执行任何计算;行动则会触发Spark计算,返回结果。这种延迟执行模式有助于优化...

    spark_learning

    《Spark学习指南——基于Java的深度解析》 Spark作为一个分布式计算框架,已经在大数据处理领域扮演了重要的角色。它以其高效、易用和可扩展性获得了广泛赞誉,尤其在实时数据处理和批处理方面表现突出。本篇将深入...

    HacklabAN-spark-one:在首次演示中使用的有关火花和大数据的源代码,@ Hacklab Ancona,意大利-one source code

    3. **转换和操作(Transformations and Actions)**:Spark的核心在于其惰性计算模型,这里会包含各种RDD(弹性分布式数据集)的操作,如map、filter、reduceByKey等。 4. **配置文件(Configuration Files)**:如`...

    基于人工智能的药物分子筛选——比赛代码第15名.zip

    在这个名为“基于人工智能的药物分子筛选——比赛代码第15名.zip”的压缩包中,我们很显然关注的是一个关于利用人工智能技术进行药物分子筛选的竞赛项目。该项目可能包含参赛者所使用的代码、数据集、模型解释以及...

    大数据加法运算器

    通过使用Spark的transformations和actions,可以便捷地实现大数据集的加法运算。 另外,还有其他工具如Apache Flink和Apache Storm,它们支持实时流数据处理,也可以执行大数据加法运算。在流处理中,数据不断流入...

    fhuwehfiuwehfui

    - **大数据处理**:Hadoop、Spark用于大规模数据处理和分析。 - **数据可视化**:Tableau、Power BI用于将数据转化为易于理解的图形。 以上是一些广泛的IT知识点,具体到您的`fhuwehfiuwehfui-main`项目,可能...

Global site tag (gtag.js) - Google Analytics