`
superlxw1234
  • 浏览: 553305 次
  • 性别: Icon_minigender_1
  • 来自: 西安
博客专栏
Bd1c0a0c-379a-31a8-a3b1-e6401e2f1523
Hive入门
浏览量:44687
社区版块
存档分类
最新评论

Spark算子:统计RDD分区中的元素及数量

阅读更多

关键字:Spark算子、Spark RDD分区、Spark RDD分区元素数量

 

 

Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候,则默认为该程序所分配到的资源的CPU核数,如果是从HDFS文件创建,默认为文件的Block数。

 

可以利用RDD的mapPartitionsWithIndex方法来统计每个分区中的元素及数量。

 

关于mapPartitionsWithIndex的介绍可以参考 mapPartitionsWithIndex的介绍

 

http://lxw1234.com/archives/2015/07/348.htm

 

 

具体看例子:

 

 

//创建一个RDD,默认分区15个,因为我的spark-shell指定了一共使用15个CPU资源
//–total-executor-cores 15

 

 

scala> var rdd1 = sc.makeRDD(1 to 50)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at :21
 
scala> rdd1.partitions.size
res15: Int = 15

 

 

//统计rdd1每个分区中元素数量

rdd1.mapPartitionsWithIndex{
        (partIdx,iter) => {
          var part_map = scala.collection.mutable.Map[String,Int]()
            while(iter.hasNext){
              var part_name = "part_" + partIdx;
              if(part_map.contains(part_name)) {
                var ele_cnt = part_map(part_name)
                part_map(part_name) = ele_cnt + 1
              } else {
                part_map(part_name) = 1
              }
              iter.next()
            }
            part_map.iterator
           
        }
      }.collect

res16: Array[(String, Int)] = Array((part_0,3), (part_1,3), (part_2,4), (part_3,3), 
(part_4,3), (part_5,4), (part_6,3), (part_7,3), (part_8,4), (part_9,3), (part_10,3), 
(part_11,4), (part_12,3), (part_13,3), (part_14,4))

//从part_0到part_14,每个分区中的元素数量

 

//统计rdd1每个分区中有哪些元素

 

rdd1.mapPartitionsWithIndex{
        (partIdx,iter) => {
          var part_map = scala.collection.mutable.Map[String,List[Int]]()
            while(iter.hasNext){
              var part_name = "part_" + partIdx;
              var elem = iter.next()
              if(part_map.contains(part_name)) {
                var elems = part_map(part_name)
                elems ::= elem
                part_map(part_name) = elems
              } else {
                part_map(part_name) = List[Int]{elem}
              }
            }
            part_map.iterator
           
        }
      }.collect
res17: Array[(String, List[Int])] = Array((part_0,List(3, 2, 1)), (part_1,List(6, 5, 4)), 
(part_2,List(10, 9, 8, 7)), (part_3,List(13, 12, 11)), (part_4,List(16, 15, 14)), 
(part_5,List(20, 19, 18, 17)), (part_6,List(23, 22, 21)), (part_7,List(26, 25, 24)), 
(part_8,List(30, 29, 28, 27)), (part_9,List(33, 32, 31)), (part_10,List(36, 35, 34)), 
(part_11,List(40, 39, 38, 37)), (part_12,List(43, 42, 41)), (part_13,List(46, 45, 44)), 
(part_14,List(50, 49, 48, 47)))
//从part_0到part14,每个分区中包含的元素

 

//从HDFS文件创建的RDD,包含65个分区,因为该文件由65个Block

scala> var rdd2 = sc.textFile("/logs/2015-07-05/lxw1234.com.log")
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at textFile at :21
 
scala> rdd2.partitions.size
res18: Int = 65

 

//rdd2每个分区的元素数量

 

scala> rdd2.mapPartitionsWithIndex{
     |         (partIdx,iter) => {
     |           var part_map = scala.collection.mutable.Map[String,Int]()
     |             while(iter.hasNext){
     |               var part_name = "part_" + partIdx;
     |               if(part_map.contains(part_name)) {
     |                 var ele_cnt = part_map(part_name)
     |                 part_map(part_name) = ele_cnt + 1
     |               } else {
     |                 part_map(part_name) = 1
     |               }
     |               iter.next()
     |             }
     |             part_map.iterator
     |            
     |         }
     |       }.collect
res19: Array[(String, Int)] = Array((part_0,202496), (part_1,225503), (part_2,214375), 
(part_3,215909), (part_4,208941), (part_5,205379), (part_6,207894), (part_7,209496), 
(part_8,213806), (part_9,216962), (part_10,216091), (part_11,215820), (part_12,217043), 
(part_13,216556), (part_14,218702), (part_15,218625), (part_16,218519), (part_17,221056), 
(part_18,221250), (part_19,222092), (part_20,222339), (part_21,222779), (part_22,223578), 
(part_23,222869), (part_24,221543), (part_25,219671), (part_26,222871), (part_27,223200), 
(part_28,223282), (part_29,228212), (part_30,223978), (part_31,223024), (part_32,222889), 
(part_33,222106), (part_34,221563), (part_35,219208), (part_36,216928), (part_37,216733), 
(part_38,217214), (part_39,219978), (part_40,218155), (part_41,219880), (part_42,215833...

 

更多关于Spark算子的介绍,可参考 Spark算子

http://lxw1234.com/archives/tag/spark%E7%AE%97%E5%AD%90

 

1
1
分享到:
评论

相关推荐

    25个经典Spark算子的JAVA实现

    根据给定文件的信息,本文将详细介绍25个经典Spark算子的Java实现,并结合详细的注释及JUnit测试结果,帮助读者更好地理解Spark算子的工作原理及其应用方式。 ### Spark算子简介 在Apache Spark框架中,算子是用于...

    Spark常用的算子以及Scala函数总结.pdf

    在 Spark 中,数据以弹性分布式数据集(RDD)的形式存在,RDD 是 Spark 的核心数据结构,它是一个不可变的分布式对象集合,能够在节点之间进行有效的容错处理。 Spark 算子大致分为两类:Transformation 算子和 ...

    【SparkCore篇03】RDD行动算子1

    这对于统计数据规模非常有用,例如,我们可以通过`count`算子计算一个RDD的元素数量。 4. `first`算子: `first`算子返回RDD中的第一个元素。这在需要快速获取数据集的代表值时非常方便。 5. `take`算子: `take...

    Spark算子的详细使用方法

    count 算子统计 RDD 中的元素个数。first 算子返回 RDD 中的第一个元素。take 算子返回 RDD 中的前 n 个元素。 在 Spark 编程中,我们可以使用 Transformation 算子来处理数据,然后使用 Action 算子来触发作业提交...

    spark算子基础讲义1

    Spark 算子是 Spark 框架中的一种数据处理单元,它可以将数据从一个RDD(Resilient Distributed Dataset)转换为另一个RDD。Spark 算子可以分为两大类: narrow dependency 算子和 wide dependency 算子。Narrow ...

    Spark大数据处理学习笔记

    本篇笔记主要涵盖了 Spark 大数据处理的学习笔记,包括了 Spark Standalone 集群的搭建、RDD 的创建和算子、RDD 的分区、RDD 典型案例、IDEA 开发词频统计项目等方面的知识点。 一、Spark Standalone 集群的搭建 *...

    spark基本算子操作

    - **示例**:对于一个包含多个分区的RDD,可以使用`mapPartitions`对每个分区进行特定的处理,例如统计每个分区的元素数量。 5. **`mapPartitionsWithIndex(func)`** - **功能**:`mapPartitionsWithIndex(func)...

    经典Spark算子的JAVA实现.zip

    6. **countByKey()**: 对于分组后的数据,可以使用此算子统计每个键对应的元素数量。 7. **join()**: join操作用于将两个数据集按照共同的键进行连接,返回一个新的数据集,包含所有匹配的键值对。 8. **distinct...

    PySpark_Day04:RDD Operations & Shared Variables.pdf

    PySpark_Day04:RDD Operations & Shared Variables 主要讲解了 RDD 算子、RDD 共享变量、综合实战案例及 Spark 内核调度。 知识点1:RDD 概念 RDD(Resilient Distributed Dataset)是 Spark Core 中的核心概念。...

    spark算子.docx

    在上面的例子中,我们使用 map 算子将一个包含四个元素的数组分配到两个分区中,然后对每个元素进行操作,模拟将数据写入数据库的过程。结果显示,每个元素都被处理了一次,每个元素都执行了一次写入数据库的操作。 ...

    【SparkCore篇02】RDD转换算子1

    1. **map()**:map算子用于对RDD(Resilient Distributed Dataset)中的每个元素进行操作。它接受一个函数作为参数,这个函数会应用到RDD的每一个元素上,然后返回一个新的RDD,其元素是原始RDD元素经过函数转换后的...

    spark考试(练习题)编程题笔记!

    4. (Key,Value)数据类型的RDD分区器:只有对于(Key,Value)的RDD,才会有Partitioner,非k-v的RDD的Partitioner的值是None。 5. 每个分区都有一个优先位置列表:''移动数据不如移动计算'' 二、RDD获取数据方式 ...

    Spark Transformation和Action算子速查表.pdf

    在Spark中,数据通常以RDD(弹性分布式数据集)的形式存在,并通过两种类型的算子进行处理:Transformation(转换)算子和Action(行动)算子。 **Transformation算子**:这些算子用于创建一个新的RDD。它们的操作...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 本实验报告主要介绍了在 Windows 环境下安装 Spark 及 RDD 编程和 Spark 编程实现 wordcount 的步骤和过程。实验中首先安装了 ...

    SparkDemo-1:spark的算子调用

    - `count()`: 计算RDD中的元素数量。 - `collect()`: 将RDD的所有元素收集到driver程序,通常用于小规模数据的查看。 - `saveAsTextFile()`: 将RDD内容保存为文本文件。 - `first()`: 获取RDD的第一个元素。 - ...

    spark-RDD的特性介绍及源码阅读必备基础

    Spark中的弹性分布式数据集(Resilient Distributed Dataset, RDD)是其核心抽象概念,它代表了一个不可变、分区的记录集合,可以在集群中并行处理。RDD的主要特性包括以下几个方面: 1. **分片列表**:RDD是由多个...

    Spark学习笔记

    - **概念**:RDD是Spark中最基本的数据抽象,是一种只读的、分布式的数据集合。 - **特性**: - 分布式存储:由多个分区组成,每个分区可以在不同的节点上。 - 容错性:通过依赖关系可以重建丢失的数据。 - 并行...

    RDD&SparkCore笔记.docx

    - RDD在代码中表现为抽象类,表示不可变、可分区的集合,并且其元素支持并行计算。 - RDD的主要功能是提供数据的并行处理和容错机制。 2. **RDD的属性** - **分区**:数据集由一组分区组成,每个分区是数据的...

    spark-textFile构建RDD的分区及compute计算策略

    Spark-textFile 构建 RDD 的分区及 Compute 计算策略 Spark-textFile 是 Spark 中的一种常用方法,用于从文本文件中构建 RDD。它的主要作用是将文本文件的内容读取出来,并将其转换为 RDD,以便进行后续的数据处理...

    【大数据学习资料】Spark单value,key-value类型21个算子(图解与源码).zip

    RDD,即弹性分布式数据集,是Spark的基本数据抽象,它是不可变且分区的。在处理数据时,Spark提供了丰富的算子,包括转换操作和动作操作。转换操作不改变原有RDD,而是创建新的RDD;动作操作则会触发实际计算,并...

Global site tag (gtag.js) - Google Analytics